Initial commit
This commit is contained in:
277
routerstats_collector.py
Executable file
277
routerstats_collector.py
Executable file
@@ -0,0 +1,277 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
#from socketserver import UnixStreamServer, BaseRequestHandler
|
||||||
|
import os
|
||||||
|
import io
|
||||||
|
from multiprocessing import Process, Queue
|
||||||
|
import queue
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from http import server, HTTPStatus
|
||||||
|
from socketserver import TCPServer
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
|
||||||
|
level=logging.DEBUG,
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
#DATASTORE = ['timestamp', 'zone', 'SRC', 'DST', 'SPT', 'DPT']
|
||||||
|
|
||||||
|
def datathingy(query_queue, query_response, collector_queue, signal_queue):
|
||||||
|
'''Handle the datastore. Updating every N secs, fetching anything from the collector queue, and responding to the query queue'''
|
||||||
|
|
||||||
|
datastore = None
|
||||||
|
logging.debug('Datastore starting')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# As the request is most important, we spin on that with a N ms timeout? Then fetch wathever our logparser got us, rince and repeat..
|
||||||
|
# Not the most elegant solution, but not that horrible:)
|
||||||
|
try:
|
||||||
|
#For the server query
|
||||||
|
#We support the following requests:
|
||||||
|
# loc-net, net_dnat
|
||||||
|
#The following arguments can be passed:
|
||||||
|
# 'all' -> number of entries of specified type since program was started
|
||||||
|
# 'since' -> SQL style date query...
|
||||||
|
# N (minute|hour|day)
|
||||||
|
request_query = query_queue.get(timeout=0.1)
|
||||||
|
req = request_query.split("\n")
|
||||||
|
request = req[0]
|
||||||
|
argument = req[1:]
|
||||||
|
logging.debug('Got request: ' + request)
|
||||||
|
logging.debug('Arguments:')
|
||||||
|
logging.debug(argument)
|
||||||
|
|
||||||
|
if datastore is None:
|
||||||
|
logging.debug('datastore has no data at this point')
|
||||||
|
query_response.put('NA')
|
||||||
|
continue
|
||||||
|
|
||||||
|
if argument[0] == 'all':
|
||||||
|
query_response.put(datastore.loc[datastore['zone'] == request].shape[0])
|
||||||
|
else:
|
||||||
|
#This better contain two things; a string representing a keyword argument for timedelta
|
||||||
|
#and something that can be parsed as an integer
|
||||||
|
try:
|
||||||
|
subtract_argument = int(argument[1])
|
||||||
|
except ValueError:
|
||||||
|
logging.fatal('Could not parse argument as int')
|
||||||
|
next
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
if argument[0] == 'minutes':
|
||||||
|
to_subtract = timedelta(minutes = subtract_argument)
|
||||||
|
elif argument[0] == 'hours':
|
||||||
|
to_subtract = timedelta(hours = subtract_argument)
|
||||||
|
elif argument[0] == 'days':
|
||||||
|
to_subtract = timedelta(days = subtract_argument)
|
||||||
|
elif argument[0] == 'weeks':
|
||||||
|
to_subtract = timedelta(weeks = subtract_argument)
|
||||||
|
elif argument[0] == 'months':
|
||||||
|
to_subtract = timedelta(days = subtract_argument)
|
||||||
|
else:
|
||||||
|
logging.fatal('Was passed unknown argument: %s', argument[0])
|
||||||
|
next
|
||||||
|
timestamp = datetime.now() - to_subtract
|
||||||
|
tmpstore = datastore.loc[datastore['timestamp'] > timestamp]
|
||||||
|
query_response.put(tmpstore.loc[tmpstore['zone'] == request].shape[0])
|
||||||
|
|
||||||
|
except (queue.Empty, KeyboardInterrupt):
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
#For the log parser...
|
||||||
|
collector_query = collector_queue.get(False)
|
||||||
|
tmpstore = pd.DataFrame(collector_query)
|
||||||
|
if datastore is None:
|
||||||
|
datastore = tmpstore
|
||||||
|
else:
|
||||||
|
datastore = pd.concat([datastore, tmpstore], ignore_index=True)
|
||||||
|
except (queue.Empty, KeyboardInterrupt):
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
#Last, the signals..
|
||||||
|
signal_query = signal_queue.get(False)
|
||||||
|
if signal_query == 'Quit':
|
||||||
|
logging.debug('DataStore quitting')
|
||||||
|
break
|
||||||
|
except (queue.Empty, KeyboardInterrupt):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def filefetcher(filename: str, collector_queue, sleep_sec=0.1):
|
||||||
|
'''Latch onto a file, putting any new lines onto the queue'''
|
||||||
|
line = ''
|
||||||
|
with open(filename, 'r') as input_file:
|
||||||
|
logging.debug('Following ' + str(input_file))
|
||||||
|
#Start at the end of the file
|
||||||
|
input_file.seek(0, io.SEEK_END)
|
||||||
|
while True:
|
||||||
|
tmp = input_file.readline()
|
||||||
|
if tmp is not None and tmp != '':
|
||||||
|
line += tmp
|
||||||
|
if line.endswith("\n"):
|
||||||
|
line.rstrip()
|
||||||
|
to_ret = parse_line(line)
|
||||||
|
collector_queue.put(to_ret)
|
||||||
|
line = ''
|
||||||
|
elif sleep_sec:
|
||||||
|
time.sleep(sleep_sec)
|
||||||
|
|
||||||
|
def parse_line(input_line: str) -> dict:
|
||||||
|
'''Parse input line into (at least) the following details:
|
||||||
|
zone:
|
||||||
|
net_dnat or loc_net
|
||||||
|
SRC:
|
||||||
|
source ip
|
||||||
|
DST:
|
||||||
|
destination ip
|
||||||
|
PROTO:
|
||||||
|
protcol
|
||||||
|
SPT:
|
||||||
|
source port
|
||||||
|
DPT
|
||||||
|
destination port'''
|
||||||
|
retval = {}
|
||||||
|
input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this?
|
||||||
|
space_split = input_line.split(' ')
|
||||||
|
retval['zone'] = space_split[4]
|
||||||
|
for stringy in space_split:
|
||||||
|
try:
|
||||||
|
key, val = stringy.split('=')
|
||||||
|
retval[str(key)] = [str(val)]
|
||||||
|
except ValueError:
|
||||||
|
#this might not match up as expected:)
|
||||||
|
pass
|
||||||
|
retval['timestamp'] = datetime.now()
|
||||||
|
logging.debug('Parsed line to ' + str(retval))
|
||||||
|
return retval
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class RequestHandler(server.SimpleHTTPRequestHandler):
|
||||||
|
'''Subclassing to change behaviour'''
|
||||||
|
def set_queues(self, queue_in, queue_out):
|
||||||
|
self.queue_in = queue_in
|
||||||
|
self.queue_out = queue_out
|
||||||
|
|
||||||
|
def send_head(self):
|
||||||
|
self.send_response(HTTPStatus.OK)
|
||||||
|
self.send_header("Content-type", "text/html")
|
||||||
|
self.end_headers()
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == '/favicon.ico':
|
||||||
|
self.send_response(HTTPStatus.NOT_FOUND);
|
||||||
|
self.end_headers()
|
||||||
|
return
|
||||||
|
self.send_head()
|
||||||
|
r = []
|
||||||
|
r.append('<!DOCTYPE HTML>')
|
||||||
|
r.append('<html lang=en>')
|
||||||
|
r.append('<head><title>Statistics</title></head>')
|
||||||
|
r.append('<style>')
|
||||||
|
r.append('table, th, td { border: 1px solid black; border-collapse: collapse; }')
|
||||||
|
r.append('td:not(:first-child) { text-align: right; }')
|
||||||
|
r.append('table { width: 90%; }')
|
||||||
|
r.append('</style>')
|
||||||
|
r.append('<body>\n')
|
||||||
|
r.extend(self.fill_body())
|
||||||
|
r.append('</body>\n')
|
||||||
|
r.append('</html>\n')
|
||||||
|
encoded = '\n'.join(r).encode('UTF-8', 'surrogateescape')
|
||||||
|
f = io.BytesIO()
|
||||||
|
f.write(encoded)
|
||||||
|
f.seek(0)
|
||||||
|
self.copyfile(f, self.wfile)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def fetch_data(self, command: str, argument: list):
|
||||||
|
'''Communicate with datastore via queue'''
|
||||||
|
r = []
|
||||||
|
if type(argument) is str:
|
||||||
|
arg = []
|
||||||
|
arg.append(argument)
|
||||||
|
argument = arg
|
||||||
|
str_args = []
|
||||||
|
for args in argument:
|
||||||
|
str_args.append(str(args))
|
||||||
|
self.server.queue_out.put(command + "\n" + "\n".join(str_args))
|
||||||
|
try:
|
||||||
|
retval = self.server.queue_in.get(timeout=1)
|
||||||
|
except queue.Empty:
|
||||||
|
retval = 'query timeout'
|
||||||
|
logging.debug('Returning: %s', retval)
|
||||||
|
return str(retval)
|
||||||
|
|
||||||
|
def fetch_data_for_connections(self, direction: str):
|
||||||
|
r = []
|
||||||
|
r.append('<tr>')
|
||||||
|
r.append('<td>' + direction + '</td>')
|
||||||
|
for tofetch in (['all'], ['minutes', '1'], ['hours', '1'], ['days', '1'], ['weeks', 1], ['months', '1']):
|
||||||
|
retval = self.fetch_data(direction, tofetch)
|
||||||
|
r.append('<td>' + str(retval) + '</td>')
|
||||||
|
r.append('</tr>')
|
||||||
|
return r
|
||||||
|
|
||||||
|
def fill_body(self):
|
||||||
|
r = []
|
||||||
|
r.append('<h1>Statistics</h1>')
|
||||||
|
r.append('<table><tr><th>Direction</th><th>All</th><th>Last minute</th><th>Last hour</th><th>Last day</th><th>Last week</th><th>Last month</th></tr>')
|
||||||
|
for direction in ('net_dnat', 'loc-net'):
|
||||||
|
r.extend(self.fetch_data_for_connections(direction))
|
||||||
|
r.append('</table>')
|
||||||
|
return r
|
||||||
|
|
||||||
|
def serve_http(httpd_port: int, queue_in, queue_out):
|
||||||
|
TCPServer.allow_reuse_address = True
|
||||||
|
with server.ThreadingHTTPServer(("", httpd_port), RequestHandler) as httpd:
|
||||||
|
httpd.queue_in = queue_in
|
||||||
|
httpd.queue_out = queue_out
|
||||||
|
logging.debug("serving at port %s", httpd_port)
|
||||||
|
httpd.serve_forever()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
'''Main thingy'''
|
||||||
|
httpd_port = 8000
|
||||||
|
file_to_follow = '/var/log/ulog/syslogemu.log'
|
||||||
|
query_queue = Queue()
|
||||||
|
query_response = Queue()
|
||||||
|
collector_queue = Queue()
|
||||||
|
signal_queue = Queue()
|
||||||
|
|
||||||
|
|
||||||
|
started_processes = []
|
||||||
|
|
||||||
|
datastore_process = Process(target=datathingy, args=(query_queue, query_response, collector_queue, signal_queue))
|
||||||
|
datastore_process.start()
|
||||||
|
started_processes.append(datastore_process)
|
||||||
|
|
||||||
|
collector_process = Process(target=filefetcher, args=(file_to_follow, collector_queue))
|
||||||
|
collector_process.start()
|
||||||
|
started_processes.append(collector_process)
|
||||||
|
|
||||||
|
http_process = Process(target=serve_http, args=(httpd_port, query_response, query_queue))
|
||||||
|
http_process.start()
|
||||||
|
started_processes.append(http_process)
|
||||||
|
|
||||||
|
# serve_http(httpd_port, query_response, query_queue)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
for p in started_processes:
|
||||||
|
if p.is_alive():
|
||||||
|
pass
|
||||||
|
time.sleep(0.1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
for p in started_processes:
|
||||||
|
if p.is_alive():
|
||||||
|
retval = p.join(timeout=1)
|
||||||
|
if retval is None:
|
||||||
|
p.kill()
|
||||||
|
break
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user