Tommonkey

All greatness comes from a brave beginning

0%

python-concurrent.futures模块

concurrent.futures模块包含了threading和multiprocessing,及多线程和多进程功能。最近也是在写自己的信息收集和漏洞扫描程序。所以对concurrent.futures模块使用的比较多,这里也是想记录下。对于任务量不大的cpu和io运算,我一般只用threading来处理,而遇到大批量复杂的任务我会用多进程+多线程同时处理,比如在我最近开发的程序中要对大批量IP进行全端口扫描,就非常适合这种模式,运行效率很nice~

concurrent.futures多线程使用

多线程代码示例,不说废话,直接看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import concurrent.futures
import time

def doTask(T):
print(T)
time.sleep(1)

def threadingTask():
taskList = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
for i in range(1,10):
task = executor.submit(doTask,i)
taskList.append(task)
for res in concurrent.futures.as_completed(taskList):
getResult = res.result()

if __name__ == "__main__":
threadingTask()

代码非常简单,一眼就食之无味了~

concurrent.futures下的多进程+多线程使用

代码示列,这段代码是我本来想写到程序里面的端口扫描模块,十几秒的时间发送6万多个包误差较大,准确性也不如nmap,后来放弃了。无奈只能在程序里面调用nmap来进行端口扫描。真是无奈的选择……但是经过测试,这段多进程+多线程的的模式用来扫5000以内的端口量那速度是又快又准。大概1-3秒就可以出结果。配置是4进程+20000线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import socket
import concurrent.futures

# 存活探测
def alive_scan(ip):
port_list = [22, 23, 53, 80, 443, 3389, 10022]
port_len = len(port_list)
num = 0
for port in port_list:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((ip, port))
sock.close()
return True
except socket.error:
num += 1
if num == port_len:
return False
else:
continue

# 全端口探测
# 利用socket tcp进行探测端口存活
def all_port_scan(ip,port,live_port):
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock_https = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((ip, port))
sock_https.connect((ip,port))
sock.close()
live_port.append(port)
return live_port
except socket.error as error:
# print(error)
# print(port)
return 1

# 线程调度模块
def threadMode(ip):
thread_list = []
live_port = []
num = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=20000) as executor:
print("目标:{},端口探测中".format(ip))
# 这里选择探测端口的范围,这里也可以改成列表形式,自定义扫描端口。当然也可以改成(1,65536)进行全端口探测,只是发包太大,结果失真。
for port in range(1,6000):
# num+=1
# if num in [5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000]:
# time.sleep(3)
thread_task = executor.submit(all_port_scan,ip,port,live_port)
thread_list.append(thread_task)
for res in concurrent.futures.as_completed(thread_list):
result = res.result()
if result != 0 and result != 1:
# 返回存活端口列表
return result
else:
return 1

# 进程,线程之间的调度逻辑中心模块
def power_control_mode(ip_list):
process_list = []
# 这里通过concurrent.futures创建四个进程,每个进程在分配到各自需要探测的IP后后会传入多线程模块,启动多线程~
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as process_executor:
for ip in ip_list:
process_task = process_executor.submit(threadMode,ip)
process_list.append(process_task)
for res in concurrent.futures.as_completed(process_list):
port_live = res.result()
if port_live != 1:
print("存活端口列表")
print(port_live)
else:
pass


if __name__ == "__main__":
iplist = ["1.1.1.1"]
live_ip = []
for i in iplist:
status = alive_scan(i)
print("目标:{}:{}".format(i,status))
live_ip.append(i)
if status:
power_control_mode(live_ip)

OVER~

奖励作者买杯可乐?