好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

nginx日志分析工具

nginx日志分析工具

日志正则匹配

nginx的日志输出为如下的格式:

 183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0 

现在需要把 remote ip ,请求时间,请求方法,请求url以及请求协议等使用正则表达式的分组功能一一匹配出来,正则的匹配模式书写如下:

 (?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)" 

在书写类似的正则时不要陷入目标数据对应的正则书写,如 [07/Apr/2017:09:32:39 +0800] ,可以理解为在一个 [] 中括号中出现了许多字符,但不会包含 [ , ] 这样的符号,所以就可写成 \[(?P&lt;datetime&gt;[^\[\]]+)\] 这样的形式。

代码验证:

 import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re测试数据pile(pattern)

matcher = regex.match(logfile)
if matcher:
    print(matcher.groupdict().items())
 

输出

 dict_items([('remote', '183.69.210.164'), ('datetime', '07/Apr/2017:09:32:39 +0800'), ('method', 'GET'), ('url', '/member/'), ('protocol', 'HTTP/1.1'), ('status', '302'), ('size', '31'), ('useragent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0')]) 

这样就得到一个类字典的数据,把一条日志是的各个部份进行了分解,但各个分组中的数据类型都是字符串类型。像 ('datetime', '07/Apr/2017:09:32:39 +0800') 是一个时间,可以转换成时间类型, ('status', '302') 是状态码,可以转换为整形,以便后续在分析时直接就拿到相应的数据类型。

日志中数据类型转换

对字符串时间进行数据转换

 import datetime

s = '07/Apr/2017:09:32:39 +0800'
dt = datetime.datetime.strptime(s, '%d/%b/%Y:%H:%M:%S %z')
print(type(dt), dt) 

输出

 <class 'datetime.datetime'> 2017-04-07 09:32:39+08:00 

对status和size可以直接使用int进行数据类型转换,这种类型转换函数可以单独定义在一个字典中,当一个功能来提供,如下:

 ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
} 

这样当日志格式变化后,可以很灵活在 opt 这个字典中增加相应的转换函数。

实现单行日志抽取

结合上边的转换函数就可以把一行日志转换成字典,该字典存放了正则匹配出的分组信息,并相应的数据已进行了类型转换,发便后期分析时使用。完整代码如下:

 import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re测试数据pile(pattern)

def extract(line):
    matcher = regex.match(line)
    if matcher:
        info = {}
        for k, v in matcher.groupdict().items():
            info[k] = ops.get(k, lambda x: x)(v)  # 巧用字典的get方法及默认值是一个匿名函数
        return info
        # 上边的代码可以用一行实现
        # return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
    else:
        return None

print(extract(logfile)) 

运行后输出

 {'remote': '183.69.210.164', 'datetime': datetime.datetime(2017, 4, 7, 9, 32, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'url': '/member/', 'protocol': 'HTTP/1.1', 'status': 302, 'size': 31, 'useragent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'} 

输出一个字典,相应的数据类型也完成了转换。

日志处理窗口函数

日志文件被读取并一条一条的送达给窗口函数,窗口函数需要缓存指定一段时间内的的日志,因日志是以时间序列产生的,当这个日志中的时间差值大于定义的 窗口 大小时,表明这时我们就需要对已缓存的日志进行分析,如:在这个 时间窗口 内访问的 url 有哪些,各个 url 访问的数量,如果在一个时间窗口内对一个url访问的数量特别的多,那可能这个url在被cc***,这时就可以触发告警,告知相关人员。

窗口函数

先看一个窗口函数的事例:

 import random
import datetime
import time

def source():
    # 模拟数据源
    while True:
        tz_utc_8 = datetime.timezone(datetime.timedelta(hours=8))   # 创建时区UTC+8:00
        datetime_now = datetime.datetime.now()
        dt = datetime_now.replace(tzinfo=tz_utc_8)  # 强制设置为UTC+8:00
        yield {'value': random.randint(1, 100), 'datetime': dt}
        time.sleep(1)

def window(src, handler, width: int, interval: int):
    """
    窗口函数
    :param src: 数据源,一个生成器
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒,表示在nginx的日志文件中的请求时间的差值在于interval时,就需要调用handler函数进行处理在这段时间内的这一批日志
    :return:
    """

    # start和current都设置为 Linux 元年时间,后边会被日志中的时间替代,作初始化值用
    start = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')
    current = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []  # 时间窗口内日志的缓存列表
    while True:
        for x in src:
            if x:
                # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)}
                buffer.append(x)
                current = x['datetime']

            # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了
            if (current - start).total_seconds() >= interval:
                ret = handler(buffer)  # 数据处理函数
                start = current  # 时间点移动

                # buffer中的数据处理
                # x['datetime'] > current - delta 满足这个条件的数据需要保留
                buffer = [x for x in buffer if x['datetime'] > current - delta]

def do_nothing_handler(iterable: list):
    # 模拟处理函数
    print(iterable)
    print(len(iterable))

window(source(), do_nothing_handler, 8, 6) 

运行后输出

 [{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
1
[{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 56, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 8, 967087, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 70, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 9, 971149, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 10, 975855, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 77, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 11, 978261, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
7
[{'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 7, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 14, 987289, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 13, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 15, 990374, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 96, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 16, 992201, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 17, 992857, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8
[{'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 89, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 21, 3320, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 45, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 22, 7064, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 23, 9548, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 24, 11991, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8
[{'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 27, 23564, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 11, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 28, 27336, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 29, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 29, 29979, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 21, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 30, 34198, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 79, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 31, 37143, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 62, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 32, 39595, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8 

当运行稳定后,每次处理的buffer中的数据为8条,即为 width 的值,而 interval 设置为6,这说明上一次buffer中的数据和下一次buffer中的数据有重复的,这在某些场景是允许有数据重复,当 width = interval 时,buffer中的数据不会有重复,如果 width < interval 时,这时就有时间丢失了,这是不能允许出现的。

整合日志处理
 import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re测试数据pile(pattern)

# 这里还会面临一个问题,如果有一行日志因一些原因不是按照定义的格式记录的,那“regex.match(line)”会是一个None,
# 那上边的代码就会报异常。所以这里还会面临异常的处理,加判断即可
def extract(line):
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}  # ops.get(k, lambda x: x)(v) 转换相应的值
    else:
        return None

def window(src, handler, width: int, interval: int):
    """
    窗口函数
    :param src: 数据源,生成器
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒,表示在nginx的日志文件中的请求时间的差值在于interval时,就需要调用handler函数进行处理在这段时间内的这一批日志
    :return: None
    """

    start = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')  # 也可以设置为None
    current = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []
    for x in src:
        if x:
            # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)}
            buffer.append(x)
            current = x['datetime']
        if (current - start).total_seconds() >= interval:  # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了
            ret = handler(buffer)  # 数据处理函数
            start = current  # 时间点移动

            # buffer中的数据处理
            # x['datetime'] > current - delta 满足这个条件的数据需要保留
            buffer = [x for x in buffer if x['datetime'] > current - delta]

# 装载函数
def load(path):
    with open(path) as f:
        for line in f:
            field = extract(line)
            if field:
                yield field
            else:
                continue  # 解析失败时抛弃或打印日志

# 处理函数
def do_nothing_handler(iterable: list):
    print(iterable)
    print(len(iterable))

# test.log 日志文件为测试文件
window(load('test.log'), do_nothing_handler, 300, 300) 

引入队列和分发器

对日志的分析函数多种多样,如:在时间窗口内请求的状态码的占比,在时间窗口内请求url的数量,各种useragent访问的数量(不是时间窗口内的统计)等。而这些分析函数往往是需要并行执行的,所以这里也会引入分发器,在分发器中使用多线程来执行相应的分析处理函数。

分发器事例
 def dispatcher(src):  # src参数是一个生成器,即是数据源,经过extract函数处理过一行数据
    queues = []
    threads = []

    def reg(handler, width, interval):
        # handler: 分析函数
        # width: 时间窗口
        # interval: 分析函数调用时间间隔
        # 注册时需要分配消费者函数window各自的队列,创建各自的线程对象
        q = Queue()
        queues.append(q)
        t = threading.Thread(target=window, args=(q, handler, width, interval))
        threads.append(t)

    def run():
        # 启动线程
        for t in threads:
            t.start()

        # 把日志装载函数load返回的数据put进各个消费者的队列,类似于广播形式
        for x in src:
            for q in queues:
                q.put(x)

    return reg, run # 返回注册函数和运行函数 

原 window 函数的第一个参数为 src 一个生成器,现在加入分发器后,第一参数变成了一个 q 队列,所以 window 函数会做相应的变更。

状态码分析函数

在一个时间窗口内请求状态码的占比能反应出后端服务的健康状况。在 window 这个窗口函数中会把一个指定时间内的日志append到一个list中,如果触发了时间窗口,那就会调用相应的 handler 函数对list中的日志进行分析,所以状态码分析函数接收一个list作为参数。

 from collections import defaultdict

def status_handler(iterable: list):
    status = defaultdict(lambda: 0)  # 使用默认字典,初始化值为0
    for item in iterable:
        key = item['status']
        status[key] += 1
    total = sum(status.values())
    # total = len(iterable)  # 数据总条数求值有多种方式,也可以上for循环中计数
    result = {k: v/total*100 for k, v in status.items()} # 求状态码在该时间窗口内所占的百分比
    # print(result)
    return result  # 返回一个字典, {'200': 95, '502': 2.4} 
useragent统计函数

客户端请求时使用的是什么浏览器也可以进行统计,只是这个统计不针对时间窗口内的日志进行统计,只针对时间窗口内统计意义不太大,而是需要进行累计统计。对useragent有专门的包可以对其进行解析,名称为 user_agents ,先看一个事例:

 from user_agents import parse  # pip install user-agents

ua = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"

u = parse(ua)
print(u.browser)
print(u.browser.family, u.browser.version_string) 

输出

 Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0')
Sogou Explorer 1.0 

所以可以利用 user_agents 包解析出user agent的名称和相应的版本。

要对user agent进行统计,那在对日志文件中的一行日志文件进行转换时,需要增加对user agent做相应的转换,在 opt 这个字典中增加相应的转换函数,如下:

 from user_agents import parse

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda useragent: parse(useragent)
} 

User agent统计代码如下:

 # 统计结果放在函数useragent_handler之外,记录当前处理的日志文件,如果放在useragent_handler内则是统计时间窗口内的日志条数
ua_dict = defaultdict(lambda: 0)

# useragent分析函数
def useragent_handler(iterable: list):
    for item in iterable:
        ua = item['useragent'] # ua是一个 user_agents.parsers.UserAgent 对象
        key = (ua.browser.family, ua.browser.version_string) # key为一个元组
        ua_dict[key] += 1
    # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True))  # 以useragent的数量升序排序
    return ua_dict 

日志分析工具完整代码

 import re
import datetime
from queue import Queue
import threading
from collections import defaultdict
from pathlib import Path
from user_agents import parse

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda useragent: parse(useragent)
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re测试数据pile(pattern)

# 日志抽取函数
def extract(line) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
    else:
        return None

def window(src: Queue, handler, width: int, interval: int):
    """
    窗口函数
    :param src: 是一个queue
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒
    :return:
    """

    start = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')  # 也可以设置为None
    current = datetime.datetime.strptime('1970/01/01 0 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []
    while True:
        data = src.get()  # 阻塞模式
        if data:
            buffer.append(data)
            current = data['datetime']
        if (current - start).total_seconds() >= interval:  # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了
            ret = handler(buffer)  # 数据处理函数
            start = current  # 时间点移动
            # buffer中的数据处理
            # x['datetime'] > current - delta 满足这个条件的数据需要保留,结合“01-日志分析回顾及数据载入”的图形看
            buffer = [x for x in buffer if x['datetime'] > current - delta]
            print(ret)  # ret是触发调用函数的执行返回值,在此可以窗口内的分析结果进行相应的判断,比如502的状态码占比大于多少时做什么

# 抽取出日志文件读取函数,是一个生成器函数
def open_file(path: str):
    with open(str(path)) as f:
        for line in f:
            field = extract(line)
            if field:
                yield field
            else:
                continue  # 解析失败时抛弃或打印日志

# 装载函数,可受多个路径,可以是目录,如果是目录只迭代一该目录下的所有文件,目录忽略
def load(*path):
    for item in path:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():  # 只处理文件,目录不管
                    yield from open_file(str(file))

        elif p.is_file():
            yield from open_file(str(p))

# 处理函数
def do_nothing_handler(iterable: list):
    print(iterable)
    print(len(iterable))

# 状态码分析处理函数,返回各状态码的占比
def status_handler(iterable: list):
    status = defaultdict(lambda: 0)
    for item in iterable:
        key = item['status']
        status[key] += 1
    total = sum(status.values())
    # total = len(iterable)  # 数据总条数求值有多种方式,也可以上for循环中计数
    result = {k: v/total*100 for k, v in status.items()}
    # print(result)
    return result

# 统计结果放在函数useragent_handler之外,记录当前处理的日志文件,如果放在useragent_handler内则是统计时间窗口内的日志条数
ua_dict = defaultdict(lambda: 0)

# useragent分析函数
def useragent_handler(iterable: list):
    for item in iterable:
        ua = item['useragent']  # ua是一个 user_agents.parsers.UserAgent 对象
        key = (ua.browser.family, ua.browser.version_string)  # key为一个元组
        ua_dict[key] += 1
    # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True))  # 以useragent的数量升序排序
    return ua_dict

# 分发器函数
# 1. 每一个window函数拥有自己的队列,相应的处理函数,以及时间窗口及处理时间间隔
# 2. window函数相当于就是生产者消费者模型中的消费者,在实际的业务环境中消费者可能许多,拿到相同的一份数据后进行各自的handler处理
def dispatcher(src):  # src参数是一个生成器,即是数据源
    queues = []
    threads = []

    def reg(handler, width, interval):
        # 注册时需要分配消费都函数window各息的队列,创建各息的线程对象
        q = Queue()
        queues.append(q)
        t = threading.Thread(target=window, args=(q, handler, width, interval))
        threads.append(t)

    def run():
        # 启动线程
        for t in threads:
            t.start()

        # 把日志装载函数load返回的数据put进各个消费者的队列
        for x in src:
            for q in queues:
                q.put(x)

    return reg, run

path = 'test.log'  # 接受可变参数传递,如 path = 'test.log, /var/logs/'
regs, runs = dispatcher(load(path))

# 注册窗口,可以注册多个,根据各自的日志处理逻辑传入handler, width, interval即可。这样每个window函数都在各自的线程中运行,互不影响。
# regs(do_nothing_handler, 10, 5)

# 注册状态码处理函数
regs(status_handler, 10, 5)

# 注册useragent分析函数
regs(useragent_handler, 10, 10)  # useragent分析不是对时间窗口内的数据求和或求平均值,数据不需要有重复

# 运行
runs() 

查看更多关于nginx日志分析工具的详细内容...

  阅读:25次