Files
routerstats/routerstats_collector.py

263 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import io
from multiprocessing import Process, Queue
import queue
from datetime import datetime, timedelta
import logging
import time
from http import server, HTTPStatus
from socketserver import TCPServer
import pandas as pd
import numpy as np
logging.basicConfig(
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
def datathingy(query_queue, query_response, collector_queue, signal_queue):
'''Handle the datastore. Updating every N secs, fetching anything from the collector queue, and responding to the query queue'''
datastore = None
logging.debug('Datastore starting')
while True:
# As the request is most important, we spin on that with a N ms timeout? Then fetch wathever our logparser got us, rince and repeat..
# Not the most elegant solution, but not that horrible:)
try:
request_query = query_queue.get(timeout=0.1)
req = request_query.split("\n")
request = req[0]
argument = req[1:]
logging.debug('Got request: ' + request)
logging.debug('Arguments:')
logging.debug(argument)
if datastore is None:
logging.debug('datastore has no data at this point')
query_response.put('NA')
continue
if argument[0] == 'all':
query_response.put(datastore.loc[datastore['zone'] == request].shape[0])
else:
#This better contain two things; a string representing a keyword argument for timedelta
#and something that can be parsed as an integer
try:
subtract_argument = int(argument[1])
except ValueError:
logging.fatal('Could not parse argument as int')
next
except IndexError:
pass
#If someone ever knows how this should be done.....
if argument[0] == 'minutes':
to_subtract = timedelta(minutes = subtract_argument)
elif argument[0] == 'hours':
to_subtract = timedelta(hours = subtract_argument)
elif argument[0] == 'days':
to_subtract = timedelta(days = subtract_argument)
elif argument[0] == 'weeks':
to_subtract = timedelta(weeks = subtract_argument)
elif argument[0] == 'months':
to_subtract = timedelta(days = subtract_argument)
else:
logging.fatal('Was passed unknown argument: %s', argument[0])
next
timestamp = datetime.now() - to_subtract
tmpstore = datastore.loc[datastore['timestamp'] > timestamp]
query_response.put(tmpstore.loc[tmpstore['zone'] == request].shape[0])
except (queue.Empty, KeyboardInterrupt):
pass
try:
#For the log parser...
collector_query = collector_queue.get(False)
tmpstore = pd.DataFrame(collector_query)
if datastore is None:
datastore = tmpstore
else:
datastore = pd.concat([datastore, tmpstore], ignore_index=True)
except (queue.Empty, KeyboardInterrupt):
pass
try:
#Last, the signals..
signal_query = signal_queue.get(False)
if signal_query == 'Quit':
logging.debug('DataStore quitting')
break
except (queue.Empty, KeyboardInterrupt):
pass
def filefetcher(filename: str, collector_queue, sleep_sec=0.1):
'''Latch onto a file, putting any new lines onto the queue'''
line = ''
with open(filename, 'r') as input_file:
logging.debug('Following ' + str(input_file))
#Start at the end of the file
input_file.seek(0, io.SEEK_END)
while True:
tmp = input_file.readline()
if tmp is not None and tmp != '':
line += tmp
if line.endswith("\n"):
line.rstrip()
to_ret = parse_line(line)
collector_queue.put(to_ret)
line = ''
elif sleep_sec:
time.sleep(sleep_sec)
def parse_line(input_line: str) -> dict:
'''Parse input line into (at least) the following details:
zone:
net_dnat or loc_net
SRC:
source ip
DST:
destination ip
PROTO:
protcol
SPT:
source port
DPT
destination port'''
retval = {}
input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this?
space_split = input_line.split(' ')
retval['zone'] = space_split[4]
for stringy in space_split:
try:
key, val = stringy.split('=')
retval[str(key)] = [str(val)]
except ValueError:
#Log entries are not perfectly symmetrical. We don't care
pass
retval['timestamp'] = datetime.now()
logging.debug('Parsed line to ' + str(retval))
return retval
class RequestHandler(server.SimpleHTTPRequestHandler):
'''Subclassing to change behaviour'''
def send_head(self):
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html")
self.end_headers()
def do_GET(self):
if self.path == '/favicon.ico':
self.send_response(HTTPStatus.NOT_FOUND);
self.end_headers()
return
self.send_head()
r = []
r.append('<!DOCTYPE HTML>')
r.append('<html lang=en>')
r.append('<head><title>Statistics</title></head>')
r.append('<style>')
r.append('table, th, td { border: 1px solid black; border-collapse: collapse; }')
r.append('td:not(:first-child) { text-align: right; }')
r.append('table { width: 90%; }')
r.append('</style>')
r.append('<body>\n')
r.extend(self.fill_body())
r.append('</body>\n')
r.append('</html>\n')
encoded = '\n'.join(r).encode('UTF-8', 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.copyfile(f, self.wfile)
f.close()
def fetch_data(self, command: str, argument: list):
'''Communicate with datastore via queue'''
r = []
if type(argument) is str:
arg = []
arg.append(argument)
argument = arg
str_args = []
for args in argument:
str_args.append(str(args))
self.server.queue_out.put(command + "\n" + "\n".join(str_args))
try:
retval = self.server.queue_in.get(timeout=1)
except queue.Empty:
retval = 'query timeout'
logging.debug('Returning: %s', retval)
return str(retval)
def fetch_data_for_connections(self, direction: str):
r = []
r.append('<tr>')
r.append('<td>' + direction + '</td>')
for tofetch in (['all'], ['minutes', '1'], ['hours', '1'], ['days', '1'], ['weeks', 1], ['months', '1']):
retval = self.fetch_data(direction, tofetch)
r.append('<td>' + str(retval) + '</td>')
r.append('</tr>')
return r
def fill_body(self):
r = []
r.append('<h1>Statistics</h1>')
r.append('<table><tr><th>Direction</th><th>All</th><th>Last minute</th><th>Last hour</th><th>Last day</th><th>Last week</th><th>Last month</th></tr>')
for direction in ('net_dnat', 'loc-net'):
r.extend(self.fetch_data_for_connections(direction))
r.append('</table>')
return r
def serve_http(httpd_port: int, queue_in, queue_out):
TCPServer.allow_reuse_address = True
with server.ThreadingHTTPServer(("", httpd_port), RequestHandler) as httpd:
httpd.queue_in = queue_in
httpd.queue_out = queue_out
logging.info("serving at port %s", httpd_port)
httpd.serve_forever()
def main():
'''Main thingy'''
httpd_port = 8000
file_to_follow = '/var/log/ulog/syslogemu.log'
query_queue = Queue()
query_response = Queue()
collector_queue = Queue()
signal_queue = Queue()
started_processes = []
datastore_process = Process(target=datathingy, args=(query_queue, query_response, collector_queue, signal_queue))
datastore_process.start()
started_processes.append(datastore_process)
collector_process = Process(target=filefetcher, args=(file_to_follow, collector_queue))
collector_process.start()
started_processes.append(collector_process)
http_process = Process(target=serve_http, args=(httpd_port, query_response, query_queue))
http_process.start()
started_processes.append(http_process)
#No idea how to manage this better?
while True:
try:
for p in started_processes:
if p.is_alive():
pass
time.sleep(0.1)
except KeyboardInterrupt:
for p in started_processes:
if p.is_alive():
retval = p.join(timeout=1)
if retval is None:
p.kill()
break
if __name__ == '__main__':
main()