From ddc6236eefcdccaa0a16a0c19f4836066e149508 Mon Sep 17 00:00:00 2001 From: Daniel Lysfjord Date: Sun, 31 Mar 2024 17:58:39 +0200 Subject: [PATCH] Initial commit --- routerstats_collector.py | 277 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100755 routerstats_collector.py diff --git a/routerstats_collector.py b/routerstats_collector.py new file mode 100755 index 0000000..f79a8ce --- /dev/null +++ b/routerstats_collector.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +#from socketserver import UnixStreamServer, BaseRequestHandler +import os +import io +from multiprocessing import Process, Queue +import queue +from datetime import datetime, timedelta +import logging +import time +from http import server, HTTPStatus +from socketserver import TCPServer + +import pandas as pd +import numpy as np + +logging.basicConfig( + format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s', + level=logging.DEBUG, + datefmt='%Y-%m-%d %H:%M:%S') + +#DATASTORE = ['timestamp', 'zone', 'SRC', 'DST', 'SPT', 'DPT'] + +def datathingy(query_queue, query_response, collector_queue, signal_queue): + '''Handle the datastore. Updating every N secs, fetching anything from the collector queue, and responding to the query queue''' + + datastore = None + logging.debug('Datastore starting') + + while True: + # As the request is most important, we spin on that with a N ms timeout? Then fetch wathever our logparser got us, rince and repeat.. + # Not the most elegant solution, but not that horrible:) + try: + #For the server query + #We support the following requests: + # loc-net, net_dnat + #The following arguments can be passed: + # 'all' -> number of entries of specified type since program was started + # 'since' -> SQL style date query... + # N (minute|hour|day) + request_query = query_queue.get(timeout=0.1) + req = request_query.split("\n") + request = req[0] + argument = req[1:] + logging.debug('Got request: ' + request) + logging.debug('Arguments:') + logging.debug(argument) + + if datastore is None: + logging.debug('datastore has no data at this point') + query_response.put('NA') + continue + + if argument[0] == 'all': + query_response.put(datastore.loc[datastore['zone'] == request].shape[0]) + else: + #This better contain two things; a string representing a keyword argument for timedelta + #and something that can be parsed as an integer + try: + subtract_argument = int(argument[1]) + except ValueError: + logging.fatal('Could not parse argument as int') + next + except IndexError: + pass + if argument[0] == 'minutes': + to_subtract = timedelta(minutes = subtract_argument) + elif argument[0] == 'hours': + to_subtract = timedelta(hours = subtract_argument) + elif argument[0] == 'days': + to_subtract = timedelta(days = subtract_argument) + elif argument[0] == 'weeks': + to_subtract = timedelta(weeks = subtract_argument) + elif argument[0] == 'months': + to_subtract = timedelta(days = subtract_argument) + else: + logging.fatal('Was passed unknown argument: %s', argument[0]) + next + timestamp = datetime.now() - to_subtract + tmpstore = datastore.loc[datastore['timestamp'] > timestamp] + query_response.put(tmpstore.loc[tmpstore['zone'] == request].shape[0]) + + except (queue.Empty, KeyboardInterrupt): + pass + try: + #For the log parser... + collector_query = collector_queue.get(False) + tmpstore = pd.DataFrame(collector_query) + if datastore is None: + datastore = tmpstore + else: + datastore = pd.concat([datastore, tmpstore], ignore_index=True) + except (queue.Empty, KeyboardInterrupt): + pass + try: + #Last, the signals.. + signal_query = signal_queue.get(False) + if signal_query == 'Quit': + logging.debug('DataStore quitting') + break + except (queue.Empty, KeyboardInterrupt): + pass + +def filefetcher(filename: str, collector_queue, sleep_sec=0.1): + '''Latch onto a file, putting any new lines onto the queue''' + line = '' + with open(filename, 'r') as input_file: + logging.debug('Following ' + str(input_file)) + #Start at the end of the file + input_file.seek(0, io.SEEK_END) + while True: + tmp = input_file.readline() + if tmp is not None and tmp != '': + line += tmp + if line.endswith("\n"): + line.rstrip() + to_ret = parse_line(line) + collector_queue.put(to_ret) + line = '' + elif sleep_sec: + time.sleep(sleep_sec) + +def parse_line(input_line: str) -> dict: + '''Parse input line into (at least) the following details: + zone: + net_dnat or loc_net + SRC: + source ip + DST: + destination ip + PROTO: + protcol + SPT: + source port + DPT + destination port''' + retval = {} + input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this? + space_split = input_line.split(' ') + retval['zone'] = space_split[4] + for stringy in space_split: + try: + key, val = stringy.split('=') + retval[str(key)] = [str(val)] + except ValueError: + #this might not match up as expected:) + pass + retval['timestamp'] = datetime.now() + logging.debug('Parsed line to ' + str(retval)) + return retval + + + +class RequestHandler(server.SimpleHTTPRequestHandler): + '''Subclassing to change behaviour''' + def set_queues(self, queue_in, queue_out): + self.queue_in = queue_in + self.queue_out = queue_out + + def send_head(self): + self.send_response(HTTPStatus.OK) + self.send_header("Content-type", "text/html") + self.end_headers() + + def do_GET(self): + if self.path == '/favicon.ico': + self.send_response(HTTPStatus.NOT_FOUND); + self.end_headers() + return + self.send_head() + r = [] + r.append('') + r.append('') + r.append('Statistics') + r.append('') + r.append('\n') + r.extend(self.fill_body()) + r.append('\n') + r.append('\n') + encoded = '\n'.join(r).encode('UTF-8', 'surrogateescape') + f = io.BytesIO() + f.write(encoded) + f.seek(0) + self.copyfile(f, self.wfile) + f.close() + + def fetch_data(self, command: str, argument: list): + '''Communicate with datastore via queue''' + r = [] + if type(argument) is str: + arg = [] + arg.append(argument) + argument = arg + str_args = [] + for args in argument: + str_args.append(str(args)) + self.server.queue_out.put(command + "\n" + "\n".join(str_args)) + try: + retval = self.server.queue_in.get(timeout=1) + except queue.Empty: + retval = 'query timeout' + logging.debug('Returning: %s', retval) + return str(retval) + + def fetch_data_for_connections(self, direction: str): + r = [] + r.append('') + r.append('' + direction + '') + for tofetch in (['all'], ['minutes', '1'], ['hours', '1'], ['days', '1'], ['weeks', 1], ['months', '1']): + retval = self.fetch_data(direction, tofetch) + r.append('' + str(retval) + '') + r.append('') + return r + + def fill_body(self): + r = [] + r.append('

Statistics

') + r.append('') + for direction in ('net_dnat', 'loc-net'): + r.extend(self.fetch_data_for_connections(direction)) + r.append('
DirectionAllLast minuteLast hourLast dayLast weekLast month
') + return r + +def serve_http(httpd_port: int, queue_in, queue_out): + TCPServer.allow_reuse_address = True + with server.ThreadingHTTPServer(("", httpd_port), RequestHandler) as httpd: + httpd.queue_in = queue_in + httpd.queue_out = queue_out + logging.debug("serving at port %s", httpd_port) + httpd.serve_forever() + +def main(): + '''Main thingy''' + httpd_port = 8000 + file_to_follow = '/var/log/ulog/syslogemu.log' + query_queue = Queue() + query_response = Queue() + collector_queue = Queue() + signal_queue = Queue() + + + started_processes = [] + + datastore_process = Process(target=datathingy, args=(query_queue, query_response, collector_queue, signal_queue)) + datastore_process.start() + started_processes.append(datastore_process) + + collector_process = Process(target=filefetcher, args=(file_to_follow, collector_queue)) + collector_process.start() + started_processes.append(collector_process) + + http_process = Process(target=serve_http, args=(httpd_port, query_response, query_queue)) + http_process.start() + started_processes.append(http_process) + +# serve_http(httpd_port, query_response, query_queue) + + while True: + try: + for p in started_processes: + if p.is_alive(): + pass + time.sleep(0.1) + except KeyboardInterrupt: + for p in started_processes: + if p.is_alive(): + retval = p.join(timeout=1) + if retval is None: + p.kill() + break + +if __name__ == '__main__': + main()