diff --git a/routerstats_collector.py b/routerstats_collector.py new file mode 100755 index 0000000..f79a8ce --- /dev/null +++ b/routerstats_collector.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +#from socketserver import UnixStreamServer, BaseRequestHandler +import os +import io +from multiprocessing import Process, Queue +import queue +from datetime import datetime, timedelta +import logging +import time +from http import server, HTTPStatus +from socketserver import TCPServer + +import pandas as pd +import numpy as np + +logging.basicConfig( + format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s', + level=logging.DEBUG, + datefmt='%Y-%m-%d %H:%M:%S') + +#DATASTORE = ['timestamp', 'zone', 'SRC', 'DST', 'SPT', 'DPT'] + +def datathingy(query_queue, query_response, collector_queue, signal_queue): + '''Handle the datastore. Updating every N secs, fetching anything from the collector queue, and responding to the query queue''' + + datastore = None + logging.debug('Datastore starting') + + while True: + # As the request is most important, we spin on that with a N ms timeout? Then fetch wathever our logparser got us, rince and repeat.. + # Not the most elegant solution, but not that horrible:) + try: + #For the server query + #We support the following requests: + # loc-net, net_dnat + #The following arguments can be passed: + # 'all' -> number of entries of specified type since program was started + # 'since' -> SQL style date query... + # N (minute|hour|day) + request_query = query_queue.get(timeout=0.1) + req = request_query.split("\n") + request = req[0] + argument = req[1:] + logging.debug('Got request: ' + request) + logging.debug('Arguments:') + logging.debug(argument) + + if datastore is None: + logging.debug('datastore has no data at this point') + query_response.put('NA') + continue + + if argument[0] == 'all': + query_response.put(datastore.loc[datastore['zone'] == request].shape[0]) + else: + #This better contain two things; a string representing a keyword argument for timedelta + #and something that can be parsed as an integer + try: + subtract_argument = int(argument[1]) + except ValueError: + logging.fatal('Could not parse argument as int') + next + except IndexError: + pass + if argument[0] == 'minutes': + to_subtract = timedelta(minutes = subtract_argument) + elif argument[0] == 'hours': + to_subtract = timedelta(hours = subtract_argument) + elif argument[0] == 'days': + to_subtract = timedelta(days = subtract_argument) + elif argument[0] == 'weeks': + to_subtract = timedelta(weeks = subtract_argument) + elif argument[0] == 'months': + to_subtract = timedelta(days = subtract_argument) + else: + logging.fatal('Was passed unknown argument: %s', argument[0]) + next + timestamp = datetime.now() - to_subtract + tmpstore = datastore.loc[datastore['timestamp'] > timestamp] + query_response.put(tmpstore.loc[tmpstore['zone'] == request].shape[0]) + + except (queue.Empty, KeyboardInterrupt): + pass + try: + #For the log parser... + collector_query = collector_queue.get(False) + tmpstore = pd.DataFrame(collector_query) + if datastore is None: + datastore = tmpstore + else: + datastore = pd.concat([datastore, tmpstore], ignore_index=True) + except (queue.Empty, KeyboardInterrupt): + pass + try: + #Last, the signals.. + signal_query = signal_queue.get(False) + if signal_query == 'Quit': + logging.debug('DataStore quitting') + break + except (queue.Empty, KeyboardInterrupt): + pass + +def filefetcher(filename: str, collector_queue, sleep_sec=0.1): + '''Latch onto a file, putting any new lines onto the queue''' + line = '' + with open(filename, 'r') as input_file: + logging.debug('Following ' + str(input_file)) + #Start at the end of the file + input_file.seek(0, io.SEEK_END) + while True: + tmp = input_file.readline() + if tmp is not None and tmp != '': + line += tmp + if line.endswith("\n"): + line.rstrip() + to_ret = parse_line(line) + collector_queue.put(to_ret) + line = '' + elif sleep_sec: + time.sleep(sleep_sec) + +def parse_line(input_line: str) -> dict: + '''Parse input line into (at least) the following details: + zone: + net_dnat or loc_net + SRC: + source ip + DST: + destination ip + PROTO: + protcol + SPT: + source port + DPT + destination port''' + retval = {} + input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this? + space_split = input_line.split(' ') + retval['zone'] = space_split[4] + for stringy in space_split: + try: + key, val = stringy.split('=') + retval[str(key)] = [str(val)] + except ValueError: + #this might not match up as expected:) + pass + retval['timestamp'] = datetime.now() + logging.debug('Parsed line to ' + str(retval)) + return retval + + + +class RequestHandler(server.SimpleHTTPRequestHandler): + '''Subclassing to change behaviour''' + def set_queues(self, queue_in, queue_out): + self.queue_in = queue_in + self.queue_out = queue_out + + def send_head(self): + self.send_response(HTTPStatus.OK) + self.send_header("Content-type", "text/html") + self.end_headers() + + def do_GET(self): + if self.path == '/favicon.ico': + self.send_response(HTTPStatus.NOT_FOUND); + self.end_headers() + return + self.send_head() + r = [] + r.append('') + r.append('') + r.append('
| Direction | All | Last minute | Last hour | Last day | Last week | Last month |
|---|