zabbix 网络线路质量监控,自定义python模块

zabbix 网络质量监控,自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢

系统环境

  Ubuntu 18.04.5 LTS+Python 3.6.9 

python库

  自带基本库,考虑到系统权限问题没有使用第三方库

权限

需要赋予执行用户icmp,tcp,udp权限,log和mtr_log文件夹需要变更所有者为执行用户

tree

#!/usr/bin/env python3
#-*-coding:utf-8-*-
from collections import deque
import itertools,time
import json
import argparse,sys,re,os,subprocess
import time,socket,random,string
import threading
from functools import reduce
import logging


ipqli=deque()
filename = os.path.realpath(sys.argv[0])
def logger():
    dir = os.path.dirname(os.path.realpath(sys.argv[0]))
    log_name = dir+'/log'
    logger = logging.getLogger()
    fh = logging.FileHandler(log_name)
    formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formater)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(fh)
    return logger
log = logger()
#ping程序,避免系统权限问题未使用ping3
class Ping:
    def __init__(self,ip,count=20,udp_length=64):
        ip = tuple(ip)
        self.sip,self.tip,self.type,self.port,self.inver=ip
        self.type = self.type.lower()
        self.port = int(self.port)
        self.count=count
        self.inver = float(self.inver)
        self.udp_length=udp_length
        restime_name = 'restime_deque'+''.join(ip).replace('.','')
        pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
        ipqevent = 'event'+''.join(ip).replace('.','')
        locals()[restime_name] = deque(maxlen=60)
        locals()[pkloss_name] = deque(maxlen=60)
        self.restime_deque = locals()[restime_name]
        self.pkloss_deque = locals()[pkloss_name]
        self.ret_restime_deque = globals()[restime_name]
        self.ret_pkloss_deque = globals()[pkloss_name]
        self.ipqevent = globals()[ipqevent]
        self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
    def _tcp(self):
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(1)
            start_time = time.time()
            res_count=0
            try:
                s.bind((self.sip,0))
                s.connect((self.tip, self.port))
                s.shutdown(socket.SHUT_RD)
                value = (time.time() - start_time)*1000  
                self.restime_deque.append(value)
                self.pkloss_deque.append(0)
                res_count=1
            except (socket.timeout,ConnectionError):
                self.restime_deque.append(0)
                self.pkloss_deque.append(1)
            except OSError as e:
                log.debug(e)
                s.shutdown(socket.SHUT_RD)
                return 0,0
            usetime = time.time()-start_time
            sleep_time = self.inver - usetime if usetime<self.inver else self.inver
            return sleep_time,res_count
    def _udp(self):
        res_count=0
        s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        s.bind((self.sip,0))
        s.settimeout(1)
        start_time = time.time()
        data=''.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
        try:
            s.sendto(data.encode('utf-8'),(self.tip,self.port))
            s.recv(1024)
            value = (time.time() - start_time)*1000
            self.restime_deque.append(value)
            self.pkloss_deque.append(0)
            res_count=1
        except socket.timeout:
            self.restime_deque.append(0)
            self.pkloss_deque.append(1)
        except OSError as e:
            log.debug(e)
            return 0,0
        usetime = time.time()-start_time
        sleep_time = self.inver - usetime if usetime<self.inver else self.inver
        return sleep_time,res_count
    def _icmp(self):
        res_count=0
        start_time = time.time()
        cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
        ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
        try:
            value=re.findall(self.compile, ret,re.S)[0]
            self.restime_deque.append(value)
            self.pkloss_deque.append(0)
            res_count=1
        except:
            self.pkloss_deque.append(1)
            self.restime_deque.append(0)
        usetime = time.time()-start_time
        sleep_time = self.inver - usetime if usetime<self.inver else self.inver
        return sleep_time,res_count
    def fastping(self):
        getattr(self, '_'+self.type)()
    def slow_ping(self):
        index = 0
        res_count=0
        #激活sendvalue函数,当该ip线程执行slow_ping的时候证明fast_ping已经执行完毕,dequeue中至少有一组数据可供sendvalue函数使用
        self.ipqevent.set()
        while index<self.count:
            sleep_time,count=getattr(self, '_'+self.type)()
            index+=1
            res_count+=count
            if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
                break
            time.sleep(sleep_time)
        return index,res_count
    def ping_value(self):
        start_time = time.time()
        count = self.count
        rescount = self.count
        if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
            fastli=[]
            for x in range(self.count):
                t = threading.Thread(target=self.fastping)
                t.start()
                fastli.append(t)
            for th in fastli:
                th.join()
        else:
            count,rescount = self.slow_ping()
            rescount=count if rescount==0 else rescount
        use_time = round(time.time()-start_time,4)
        li = [self.restime_deque.pop() for x in range(count)]
        pkli = [self.pkloss_deque.pop() for x in range(count)]
        try:
            restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
            pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
            return (round(restime,2),round(pkloss,2),use_time)   
        except Exception as e:
            log.debug(e)
            return 0,0,0
#server端代码
class Server():
    def __init__(self,sock):
        global ipqli
        self.ipqli=ipqli
        self.thli=[]
        self.ipli = []
        self.sock=sock
        self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
        self.env = threading.Event()
    @classmethod
    def start(cls):
        try:
            s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            address = ('127.0.0.1',6599)
            s.bind(address)
            obj = cls(s)
            ping_server=threading.Thread(target=obj.server)
            ping_server.start()
            obj.thli.append(ping_server)
            create_t = threading.Thread(target=obj.create)
            create_t.start()
            obj.thli.append(create_t)
            for t in obj.thli:
                t.join()
        except Exception as e:
            log.debug(e)
    def server(self):
        while True:
            try:
                self.sock.listen(200)
                conn,addr = self.sock.accept() 
                data=conn.recv(1024) 
                data = data.decode('utf-8')
                data = json.loads(data)
                ip,item = data
                restime_ipq = 'restime_deque'+''.join(ip).replace('.','')
                history_ipq = 'history_deque'+''.join(ip).replace('.','')
                pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','')
                ipqevent = 'event'+''.join(ip).replace('.','')
                if ip not in self.ipli:
                    globals()[restime_ipq] = deque(maxlen=100)
                    globals()[history_ipq] = deque(maxlen=100)
                    globals()[pkloss_ipq] = deque(maxlen=100)
                    globals()[ipqevent] = threading.Event()
                    self.ipqli.append(ip)
                    self.ipli.append(ip)
                    log.debug('create ipdeque %s %s'%(restime_ipq,pkloss_ipq))
                    #通知create函数开始创建线程
                    self.env.set()
                self.sendvalue(conn,ip,item)
                conn.close()
            except Exception as e:
                log.debug(str(e))
                conn.close()
    def create(self):
        while True:
            # 线程阻塞,等待ipqli有新数据
            self.env.wait()
            try:
                ip = self.ipqli.pop()
                log.debug('create %s'%ip)
                t=threading.Thread(target=self.makevalue,args=(ip,))
                t.start()
            except Exception as a:
                log.debug(str(a))
            if not self.ipqli:
                self.env.clear()
            
    def makevalue(self,ip):
        restime_name = 'restime_deque'+''.join(ip).replace('.','')
        pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
        ipqevent_name = 'event'+''.join(ip).replace('.','')
        restime_ipq = globals()[restime_name]
        pkloss_ipq = globals()[pkloss_name]
        ipqevent = globals()[ipqevent_name]
        obj = Ping(ip)
        while len(restime_ipq) < 100 or len(pkloss_ipq) <100:
            restime,pkloss,use_time=obj.ping_value()            
            restime_ipq.append((restime,use_time))
            pkloss_ipq.append((pkloss,use_time))   
        else:
            del restime_ipq
            del pkloss_ipq
            del ipqevent
            self.ipli.remove(ip)
            log.debug('delete ipdeque %s %s'%(restime_name,pkloss_name))
    def sendvalue(self,conn,ip,item):
        fromat_ip=''.join(ip).replace('.','')
        tip=ip[1]
        mtr_option = ' '.join(ip)
        restime_name = 'restime_deque'+fromat_ip
        history_name = 'history_deque'+fromat_ip
        pkloss_name = 'pkloss_deque'+fromat_ip
        ipqevent_name = 'event'+fromat_ip
        restime_ipq = globals()[restime_name]
        history_ipq = globals()[history_name]
        pkloss_ipq = globals()[pkloss_name]
        ipqevent = globals()[ipqevent_name]
        mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
        mtr_cmd = self.basedir + '/mtr'+' '+mtr_option+' '+ mtr_dir
        if len(restime_ipq) < 2 and len(pkloss_ipq) <2:
            ipqevent.clear() # 当返回数据通道数据不足时阻塞该进程
        try:
            ipqevent.wait() # 侦听slow_ping函数是否运行,运行后会激活该进程
            if item =='restime':
                ret,use_time = restime_ipq.pop()
                if len(history_ipq) != 0:
                    hisret = history_ipq.pop()
                else:
                    hisret = ret
                history_ipq.append(ret)
                if ret - hisret >30:
                    log.debug(tip+'延迟波动{} {}'.format(hisret,ret))
                    subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            elif item =='pkloss':
                ret,use_time = pkloss_ipq.pop()
                if 100> ret  >20:
                    log.debug(tip+'丢包率{}%'.format(ret))
                    subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        except Exception as a:
            log.debug(str(a)+ restime_name)
        if len(restime_ipq) > 80 and len(pkloss_ipq) > 80:
            restime_ipq.clear()
            pkloss_ipq.clear()
        conn.sendall(str(ret).encode())

#用户输入IP格式检查
class Ipcheck():
    def __init__(self,sip,tip,item,ping_type,inver):
        self.sip =sip
        self.tip=tip
        self.item=item
        self.type = ping_type.lower()
        self.inver=float(inver)
    def check(self):
        if self.item not in ['restime','pkloss'] or self.type not in ['icmp','tcp','udp'] or self.inver<0.2:
            return False
        elif not self.checkipformat():
            return False
        else:
            return True
    def check_fun(self,ip):
        return int(ip)<256
    def checkipformat(self):
        try:
            tiplist = self.tip.split('.')
            tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip)
            if  self.sip:
                siplist = self.sip.split('.')
                sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip)
            else:
                siplist=[1,1,1,1]
                sipformat=True
            if not tipformat or not sipformat:
                raise
            check_ipli = tiplist+siplist
            return self.checkiplength(check_ipli)
        except:
            return False
    def checkiplength(self,check_ipli):
        if list(itertools.filterfalse(self.check_fun, check_ipli)):
            return False
        else:
            return True        
def run():

    cmd = '%s -S server'%filename
    subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#socket_client端,向server请求数据并返回给用户

def killpid():
    cmd = 'ps -aux | grep "newpingd.py -S server"'
    pidlist=[]
    ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
    ret = ret.decode().replace('\n',r'\n').split(r'\n')
    ret.remove('')
    for process in ret:
        list1=process.split(' ')
        list2 =[x for x in list1 if x!='']
        pidlist.append(list2[1])
    for pid in pidlist:
        subprocess.Popen('kill %s'%pid,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)


def socket_client(ip,item):
    try:
        s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        s.settimeout(3)
        s.connect(('127.0.0.1',6599))
        data = [ip,item]
        data = json.dumps(data)
        s.sendall(data.encode())
        ret = s.recv(1024)
        s.close()
        print(ret.decode())
    except socket.timeout as t:
        log.debug(str(t))
        s.close()
        killpid()
        sys.exit(0)
    except Exception as e:
        print('server will start')
        log.debug(str(e))
        sys.exit(0)
        
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Example: newpingd -t 10.0.0.1  -s 10.0.3.108  -I restime OR\
                                     newpingd -t 10.0.0.1 -s 10.0.3.108  -I restime  -T tcp -p 80')
    parser.add_argument('-S',action = 'store',dest='server',help='The first time to run the program automatically carry, do not specify')
    parser.add_argument('-t',action = 'store',dest='tip',help='Destination ip')
    parser.add_argument('-s',action = 'store',dest='sip',help='Source ip')
    parser.add_argument('-I',action='store',dest='item',help='Item or pkloss')
    parser.add_argument('-i',action='store',dest='inver',default='1',help='Packet sending interval default 1')
    parser.add_argument('-T',action='store',dest='ping_type',default='icmp',help='Protocol default icmp')
    parser.add_argument('-p',action='store',dest='port',default='0',help='Destination port default 0')
    args= parser.parse_args()
    server_status_cmd = "ps -ef | grep '%s -S server' | grep -v grep | cut -c 9-16"%filename
    server_status  = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
    if not server_status:
        run()
    if args.server:
        Server.start()
        sys.exit(0)
    try:
        tip = socket.gethostbyname(args.tip)
        sip = args.sip
        item = args.item
        ping_type = args.ping_type
        port = args.port
        inver=args.inver
        ip=(sip,tip,ping_type,port,inver)
    except:
         print('''---------------------------Options-----------------------------------
-s --source ip address
-t --destination ip address
-I --item(restime/pkloss)
-T --type(icmp/tcp/udp default icmp)
-p --port(default 0)
-i --inver(default 1/min 0.2)
---------------------------Example-----------------------------------
------newpingd -t 10.0.0.1 -s 10.0.3.108  -I restime  -T tcp -p 80-------
------newpingd -t 10.0.0.1  -s 10.0.3.108  -I restime        
        '''
        )
         sys.exit(0)
    check = Ipcheck(sip, tip, item,ping_type,inver)
    if not check.check():
        print('ip格式错误')
        sys.exit(0)
    socket_client(ip,item)

mtr.py

#!/usr/bin/env python3
#-*-coding:utf-8-*-
import sys,logging,os,subprocess
import json
import smtplib
from email.utils import formataddr
from email.mime.text import MIMEText
def logger(ip,log_name):
    logger = logging.getLogger()
    fh = logging.FileHandler(log_name)
    formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formater)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(fh)
    return logger
class sendemail:
    def __init__(self,content,subject):
        self.content = content
        self.subject = subject
    def sendemail(self):
        msg = MIMEText(self.content,'plain','utf-8')
        msg['from'] = formataddr(mail_from)
        msg['to'] = ','.join(mail_list)
        msg['subject'] = self.subject
        try:
            service = smtplib.SMTP(smtp_server,timeout=5)
            service.login(user,passwd)
            service.sendmail(user,mail_list,msg.as_string())
            service.quit()
        except:
            return
def mtr(sip,tip,t_ype,port,log_name):
    cmd ='mtr -a %s -r -n -c 30 -w -b %s'%(sip,tip) if t_ype=='icmp' else 'mtr -a %s -r -n -c 30 -w -b --%s -P %s %s'%(sip,t_ype,port,tip)
    data = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
    log_content = cmd + '\n' + data
    if  log_name.split('/')[-1] not in os.listdir(BaseDir+'/mtr_log'):
        title = '%s到 %s 线路异常'%(hostname,tip_localtion)
        mail = sendemail(data,title)
        mail.sendemail()
    log = logger(tip,log_name)
    log.debug(log_content)
if __name__ =='__main__':
    _,sip,tip,t_ype,port,inv,log_name = sys.argv
    BaseDir = os.path.dirname(os.path.realpath(sys.argv[0]))
    with open(BaseDir+"/setting.json",'r') as a:
        conf = json.loads(a.read())
    mail_from = conf['mailfrom']
    user = conf['user']
    passwd = conf['passwd']
    try:
        hostname = conf['hostname'][sip]
    except:
        hostname = conf['hostname']['default']
    mail_list = conf['mail_list']
    smtp_server = conf['smtp_server']
    try:
        tip_localtion = conf['ip_localtion'][tip]
    except:
        tip_localtion = tip
    mtr(sip,tip,t_ype,port,log_name)

setting.json 

{
    "smtp_server":"smtp.qq.com",
    "user":"976584601@qq.com",
    "passwd":"xxxx",
    "mailfrom":["dark","976584601@qq.com"],
    "hostname":{"129.225.142.51":"南非","115.148.125.32":"菲律宾","default":"南非"},
    "mail_list":["xxxx@xxxx.com","cs11241991@163.com"],
    "ip_localtion":{
                    "146.185.173.26":"Amsterdam",
                    "108.61.198.200":"Amsterdam",
                    "172.247.36.37":"Frankfurt",
                    "158.177.119.118":"Frankfurt",
                    "107.155.16.239":"Frankfurt",
                    "108.61.170.254":"Frankfurt",
                    "103.219.22.126":"London",
                    "108.61.196.36":"London",
                    "140.204.196.5":"Madrid",
                    "185.157.232.103":"Mancheste",
                    "176.126.83.222":"Milan",
                    "128.1.55.57":"Moscow",
                    "107.155.10.38":"Moscow",
                    "185.175.56.111":"Oslo",
                    "108.61.123.142":"Paris",
                    "151.80.238.250":"Roubaix",
                    "88.150.168.1":"Serbia",
                    "8.211.29.219":"阿里",
                    "185.175.58.114":"Varna"
                    }
}

udp探测需要服务器端开启对应端口

#!/usr/bin env python3
import socket
while True:
    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    sock.bind(('ipaddress',port))
    data,addr = sock.recvfrom(65535)
    sock.sendto(data,addr)

 也可以使用socat,实际测试使用socat会引入额外开销,时延不准确

socat -v UDP-LISTEN:4000,fork PIPE

Leave a Reply

Your email address will not be published. Required fields are marked *

X