python 批量探测服务端开放的TCP端口

现在大多服务器都有做icmp限制或直接禁掉,导致我们业务去连接服务器异常时无法判断是程序问题还是网络问题,所以写一个简单探测tcp端口脚本来探测服务器所开放的端口,再使用tcp测试双向时延来排掉网络问题

#!/usr/bin env python
#-*-coding:utf-8-*-
import socket,time
from concurrent import futures
import signal,os,argparse
import ipaddress as dst
from sys import exit

#---------------------------------------------信号侦听模块------------------------------------------
def signal_handler(signal, frame):
    show()
    os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
#----------------------------------------------扫描函数-------------------------------------------------
socket.setdefaulttimeout(0.1)
def show():
    time.sleep(0.1)
    if not q.empty():
        while not q.empty():
            q_dic = q.get()
            for ip in q_dic:
                print('扫描结束,此次扫描的主机%s,开放的端口有%s'%(ip,q_dic[ip]))
    else:
        print()
        print('扫描结束,此次扫描的主机,没有开放的TCP端口')
class Scan:
    def __init__(self,ip):
        self.port_li=[]
        self.success = {}
        self.port,self.host = ip
        self.host = str(self.host)
        global q
        self.q = q
        self.start()
    def portrange(self):
        startport,maxport = self.port.split('-')
        startport,maxport= int(startport),int(maxport)
        endport = maxport+1 if maxport< startport + 200 else startport +200
        portrange_list=[(startport,endport)]
        while endport < maxport:
            startport,endport=endport,endport+200
            if endport >= maxport:
                endport = maxport+1
            portrange_list.append((startport,endport))
        return portrange_list
    def scan(self,portrange):
        startport,endport = portrange
        for port in range (startport,endport):
            s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.settimeout(0.2)
            s.bind((sip,0))
            try:
                s.connect((self.host,port))
                try:
                    port_type = s.recv(1024).decode().strip()
                    print('%s-%d端口开启,%s'%(self.host,port,port_type))
                except:
                    print('%s-%d端口开启'%(self.host,port))
                self.port_li.append(port)
            except:
                pass
            s.close()    
    def start(self):
        portrange_list = self.portrange()
        with futures.ThreadPoolExecutor(100) as executor:
            res = executor.map(self.scan, [portrange for portrange in portrange_list])
        if self.port_li:
            self.success[self.host]=self.port_li
            self.q.put(self.success)
if __name__=='__main__':
    from multiprocessing import Process,Queue
    #接收参数
    parser = argparse.ArgumentParser(description='ip_scan')
    parser.add_argument('-s',action = 'store',dest='sip')
    parser.add_argument('-t',action = 'store',dest='tip')
    parser.add_argument('-p',action='store',dest='port')
    args= parser.parse_args()
    try:
        sip = args.sip
        dip = dst.ip_network(args.tip)
        port = args.port
    except ValueError as format_error:
        print(format_error)
        print('-----------------example--------------')
        print('tcpportstatus.py -s 127.0.0.1 -t 10.0.0.0/24 -p 0-65535')
        exit(0)
    q = Queue()
    print('-----------------------------------开始扫描------------------------------------')
    starttime = time.time()
    print('扫描地址数量%s,端口%s'%(dip.num_addresses,port))
    
    with futures.ThreadPoolExecutor(100) as executor:
        res = executor.map(Scan, [(port,host) for host in dip])
    usetime = time.time()-starttime+1
    print('--------------------------扫描结束用时%s秒----------------------'%int(usetime))
    show()

多进程多线程扫描,一个ip一个进程,根据扫描机器cpu性能可以调整startport和endport的参数来跑更多线程,以我的设备为例,Intel(R) Xeon(R) CPU E3-1220 V2 @ 3.10GHz ,扫描50个ip地址65535以下端口用时71秒

Leave a Reply

Your email address will not be published. Required fields are marked *

X