diff --git a/routerstats_client.py b/routerstats_client.py new file mode 100755 index 0000000..8b7f987 --- /dev/null +++ b/routerstats_client.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 + +import socket +import logging +import time +import os + +import rrdtool + +logging.basicConfig( + format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + +class NotConnected(Exception): + logging.error('NotConnected error') + pass + +class routerstats_client(): + def __init__(self, host, port): + self.host = host + self.port = port + self.connected = False + + def connect(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + self.sock.connect((self.host, self.port)) + self.sock.settimeout(1) + logging.info('Connected to ' + str(self.host)) + self.connected = True + except ConnectionRefusedError as error: + logging.error('Could not connect to ' + str(self.host) + ':' + str(self.port)) + raise ConnectionRefusedError(error) + + def send(self, tosend): + if self.connected: + logging.debug('Sending ' + str(tosend)) + send_bytes = bytes(tosend + "\n", 'utf-8') + try: + self.sock.send(send_bytes) + except OSError as error: + logging.error('Cound not send to server: ' + str(error)) + self.connected = False + raise NotConnected + try: + self.sock.getpeername() + except OSError as error: + logging.error('Could not send to server: ' + str(error)) + self.connected = False + raise NotConnected + else: + logging.error('Not connected to server') + raise NotConnected('Not connected to server') + + def recv(self): + if self.connected != True: + logging.error('Trying to recv when not connected') + raise NotConnected + line = b'' + blanks = 0 + while True: + try: + toret = self.sock.recv(1) + if toret: + line += toret + if blanks > 0: + blanks -= 1 + if line.endswith(b'\n'): + #We're done for now, returning value + line = line.strip().decode('utf-8') + logging.debug('Received from server: ' + str(line)) + return line + else: + blanks += 1 + if blanks >= 5: + blanks = 0 +# break + logging.debug('Too many blank reads, and still no complete line') + self.connected = False + raise NotConnected + except OSError as error: + if str(error) == 'timed out': + #This is expected, as it is how we loop + break + logging.error('OSError: ' + str(error)) + logging.debug('Got: ' + str(line)) + self.connected = False + raise NotConnected(error) + except TimeoutError: + logging.error('Timeout while fetching data, got: ' + str(line)) + break + +def handle_server_msg(received: str): + '''Do something about something from the server''' + try: + timestamp, zone = received.split() + except ValueError: + logging.error('Could not parse ' + str(received) + ' into two distinct values') + return + try: + timestamp = int(timestamp) + except ValueError: + logging.error('Could not parse ' + str(timestamp) + ' as an int') + return + toret = {'timestamp': timestamp, 'net_dnat': 0, 'loc-net': 0} + try: + toret[zone] += 1 + except KeyError as error: + logging.debug('Ignoring zone: ' + str(error)) + logging.debug('Parsed to: ' + str(toret)) + return toret + +def handle(received, client): + '''Do things with whatever came from the server''' + if not received: + return + if received == 'ping': + client.send('pong') + return True + elif received is not None: + if received: + return handle_server_msg(received) + +class UpdateRRD: + '''Handle updates to the rrd-file, since we cannot update more than once every second''' + def __init__(self, rrdfile): + self.rrdfile = rrdfile + if os.path.isfile(rrdfile) != True: + self.create() + self.toupdate = {'timestamp': None, 'net_dnat': 0, 'loc-net': 0} + self.freshdict = self.toupdate.copy() + + def create(self): + rrdtool.create(self.rrdfile, "--start", "1000000000", "--step", "10", "DS:net_dnat:ABSOLUTE:10:0:U", "DS:loc-net:ABSOLUTE:10:0:U", "RRA:AVERAGE:0.5:1:1d", "RRA:AVERAGE:0.5:1:1M") + logging.debug('Created rrdfile ' + str(self.rrdfile)) + + def push(self): + #Make sure we're not doing something stupid.. + info = rrdtool.info(self.rrdfile) + if self.toupdate['timestamp'] is None: + return False + if info['last_update'] >= self.toupdate['timestamp']: + logging.error('Trying to update when rrdfile is newer than our timestamp. Ignoring line and resetting.') + self.toupdate = self.freshdict.copy() + return False + try: + rrdtool.update( + self.rrdfile, + str(self.toupdate['timestamp']) + + ':' + + str(self.toupdate['net_dnat']) + + ':' + + str(self.toupdate['loc-net'])) + self.toupdate = self.freshdict.copy() + logging.debug('Updated rrdfile') + return True + except rrdtool.OperationalError as error: + logging.error(str(error)) + return False + + def add(self, input_dict): + if self.toupdate['timestamp'] is None: + #Never touched, just overwrite with whatever we got + self.toupdate = input_dict + elif input_dict['timestamp'] > self.toupdate['timestamp']: + #What we get is fresher than what we have, and noone has done a push yet + self.push() + self.toupdate = input_dict + elif input_dict['timestamp'] == self.toupdate['timestamp']: + #Same timestamp, just up values and be happy + self.toupdate['net_dnat'] += input_dict['net_dnat'] + self.toupdate['loc-net'] += input_dict['loc-net'] + elif input_dict['timestamp'] < self.toupdate['timestamp']: + logging.error('Newly fetched data is older than what we have in the queue already. Passing.') + else: + logging.error('Not sure what to do here? ' + str(input_dict) + str(self.toupdate)) + +def main(): + rrdfile = 'test.rrd' + rrdupdater = UpdateRRD(rrdfile) + client = routerstats_client('127.0.0.1', 9999) + while True: + try: + client.connect() + break + except ConnectionRefusedError: + time.sleep(1) + + tries = 0 + loops = 0 + while True: + try: + retval = handle(client.recv(), client) + if retval: + if retval == True: + loops = 0 + else: + rrdupdater.add(retval) + else: + rrdupdater.push() + loops += 1 + if loops >= 60: + logging.error('No data in 60 seconds. We expect a ping/pong every 30. Lost connection, probably') + loops = 0 + raise NotConnected + except NotConnected: + try: + client.connect() + tries = 0 + except ConnectionRefusedError: + logging.debug('ConnectionRefused') + tries += 1 + if tries >= 5: + tries = 5 + time.sleep(tries) + except ConnectionRefusedError: + logging.debug('ConnectionRefused') + time.sleep(1) + +if __name__ == '__main__': + main() diff --git a/routerstats_collector.py b/routerstats_collector.py index e65b73c..eb38fa1 100755 --- a/routerstats_collector.py +++ b/routerstats_collector.py @@ -1,261 +1,429 @@ #!/usr/bin/env python3 +import signal import os import io +import sys +import stat from multiprocessing import Process, Queue +import socketserver +import threading import queue from datetime import datetime, timedelta import logging import time -from http import server, HTTPStatus -from socketserver import TCPServer +import pickle -import pandas as pd -import numpy as np +from setproctitle import setproctitle logging.basicConfig( format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s', - level=logging.INFO, + level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S') -def datathingy(query_queue, query_response, collector_queue, signal_queue): - '''Handle the datastore. Updating every N secs, fetching anything from the collector queue, and responding to the query queue''' +class TimeToQuit(Exception): + '''Used to pass Quit to subthreads''' + pass - datastore = None - logging.debug('Datastore starting') +class ReloadLog(Exception): + '''Used to reload log file''' + pass +def filefetcher(filename: str, collector_queue, signal_queue, sleep_sec=0.5, seek_pos=None): + '''Latch onto a file, putting any new lines onto the queue.''' + setproctitle('routerstats-collector file-fetcher') + if float(sleep_sec) <= 0.0: + logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)') + return False while True: - # As the request is most important, we spin on that with a N ms timeout? Then fetch wathever our logparser got us, rince and repeat.. - # Not the most elegant solution, but not that horrible:) try: - request_query = query_queue.get(timeout=0.1) - req = request_query.split("\n") - request = req[0] - argument = req[1:] - logging.debug('Got request: ' + request) - logging.debug('Arguments:') - logging.debug(argument) + input_file = open(filename, 'r') + except FileNotFoundError: + logging.debug('Retry opening ' + filename) + time.sleep(1) + continue - if datastore is None: - logging.debug('datastore has no data at this point') - query_response.put('NA') - continue + retries = 0 + line = '' + start_stat = os.stat(filename) + if seek_pos is None: + cur_pos = input_file.seek(0, 0) + else: + cur_pos = input_file.seek(seek_pos, io.SEEK_SET) - if argument[0] == 'all': - query_response.put(datastore.loc[datastore['zone'] == request].shape[0]) - else: - #This better contain two things; a string representing a keyword argument for timedelta - #and something that can be parsed as an integer - try: - subtract_argument = int(argument[1]) - except ValueError: - logging.fatal('Could not parse argument as int') - next - except IndexError: - pass - #If someone ever knows how this should be done..... - if argument[0] == 'minutes': - to_subtract = timedelta(minutes = subtract_argument) - elif argument[0] == 'hours': - to_subtract = timedelta(hours = subtract_argument) - elif argument[0] == 'days': - to_subtract = timedelta(days = subtract_argument) - elif argument[0] == 'weeks': - to_subtract = timedelta(weeks = subtract_argument) - elif argument[0] == 'months': - to_subtract = timedelta(days = subtract_argument) + logging.info('Following ' + filename + ' (inode ' + str(start_stat.st_ino) + ') from pos ' + str(cur_pos)) + + try: + while True: + tmp = input_file.readline() + if tmp is not None and tmp != '': + line += tmp + if line.endswith("\n"): + line.rstrip() + if line.isspace(): + logging.debug('Empty line is empty, thank you for the newline') + else: + logging.debug('Sending line ending at pos ' + str(input_file.tell())) + parse_and_queue_line(line, collector_queue) + line = '' + start_stat = os.stat(filename) + cur_pos = input_file.tell() + else: + logging.debug('readline reported line with no \n?') else: - logging.fatal('Was passed unknown argument: %s', argument[0]) - next - timestamp = datetime.now() - to_subtract - tmpstore = datastore.loc[datastore['timestamp'] > timestamp] - query_response.put(tmpstore.loc[tmpstore['zone'] == request].shape[0]) + #Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:) + any_signal = got_signal(signal_queue, sleep_sec) + if any_signal == 'Quit': + shutdown_filefetcher(collector_queue, input_file) + logging.critical('Shutdown filefetcher') + return True + now_stat = os.stat(filename) + if now_stat.st_ino != start_stat.st_ino: + if now_stat.st_ctime == start_stat.st_ctime: + #Strange, inode has changed, but ctime is the same? + if now_stat.st_size >= start_stat.st_size: + logging.warning('New inode number, but same ctime? Not sure how to handle this. Reopening, but keeping seek position...') + else: + logging.warning('New inode number, same ctime, but smaller? Much confuse, starting from beginning..') + seek_pos = 0 + else: + logging.debug('File have new inode number, restarting read from start') + seek_pos = 0 + break + if now_stat.st_size < start_stat.st_size: + logging.debug('File is now smaller than last read, restarting from start') + seek_pos = 0 + break + except KeyboardInterrupt: + shutdown_filefetcher(collector_queue, input_file) + logging.debug('KeyboardInterrupt, closing file and quitting') + return False + except FileNotFoundError: + '''Input file gone-gone during loop, retry opening a few times''' + logging.debug('File gone away') + next - except (queue.Empty, KeyboardInterrupt): - pass - try: - #For the log parser... - collector_query = collector_queue.get(False) - tmpstore = pd.DataFrame(collector_query) - if datastore is None: - datastore = tmpstore - else: - datastore = pd.concat([datastore, tmpstore], ignore_index=True) - except (queue.Empty, KeyboardInterrupt): - pass - try: - #Last, the signals.. - signal_query = signal_queue.get(False) - if signal_query == 'Quit': - logging.debug('DataStore quitting') - break - except (queue.Empty, KeyboardInterrupt): - pass +def shutdown_filefetcher(output_queue, input_file): + '''Cleanly close filehandles, save log position and queue contents''' + cur_pos = input_file.tell() + input_file.close() + with open('position', 'w') as output_file: + logging.debug('Saving current position ' + str(cur_pos)) + output_file.write(str(cur_pos)) + dump_queue(output_queue) -def filefetcher(filename: str, collector_queue, sleep_sec=0.1): - '''Latch onto a file, putting any new lines onto the queue''' - line = '' - with open(filename, 'r') as input_file: - logging.debug('Following ' + str(input_file)) - #Start at the end of the file - input_file.seek(0, io.SEEK_END) - while True: - tmp = input_file.readline() - if tmp is not None and tmp != '': - line += tmp - if line.endswith("\n"): - line.rstrip() - to_ret = parse_line(line) - collector_queue.put(to_ret) - line = '' - elif sleep_sec: - time.sleep(sleep_sec) +def got_signal(signal_queue: Queue, sleep_sec: float): + '''Read from signal_queue with a timeout of sleep_sec, + returns either a signal name (whatever text string the queue gave us), or None''' + try: + any_signal = signal_queue.get(timeout=sleep_sec) + logging.critical('Got ' + any_signal) + return any_signal + except queue.Empty: + return None + +def parse_and_queue_line(line: str, collector_queue): + '''Parse and queue the result''' + to_ret = parse_line(line) + if to_ret: + collector_queue.put(to_ret) def parse_line(input_line: str) -> dict: - '''Parse input line into (at least) the following details: - zone: - net_dnat or loc_net - SRC: - source ip - DST: - destination ip - PROTO: - protcol - SPT: - source port - DPT - destination port''' - retval = {} - input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this? - space_split = input_line.split(' ') - retval['zone'] = space_split[4] - for stringy in space_split: - try: - key, val = stringy.split('=') - retval[str(key)] = [str(val)] - except ValueError: - #Log entries are not perfectly symmetrical. We don't care - pass - retval['timestamp'] = datetime.now() + '''Fetch fourth space-separated item as zone, prepend with unix timestamp from the 3 first items''' + #Input line: Month DayOfMonth HH:MM:SS hostname zone.... + #The input line is in the local timezone + #Seems like python known about our timezone, so when converting to unix timestamp from random text + #it should handle changes from CET to CEST + try: + input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this? + space_split = input_line.split(' ') + month = space_split[0] + dateint = space_split[1] + timestr = space_split[2] + zone = space_split[4] + now = datetime.now() + except IndexError: + return None + logline_time = datetime.strptime(str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr), '%Y %b %d %H:%M:%S') + #If this is in the future, this probably means the data is from last year + if logline_time > now: + #We're in the future. Prettu sure the future is not yet, we're in last year + #This might happen around first of january + logline_time = logline_time.replace(year=logline_time.year - 1) + timestamp = int(logline_time.timestamp()) + retval = (timestamp, zone) logging.debug('Parsed line to ' + str(retval)) return retval - - -class RequestHandler(server.SimpleHTTPRequestHandler): - '''Subclassing to change behaviour''' - def send_head(self): - self.send_response(HTTPStatus.OK) - self.send_header("Content-type", "text/html") - self.end_headers() - - def do_GET(self): - if self.path == '/favicon.ico': - self.send_response(HTTPStatus.NOT_FOUND); - self.end_headers() - return - self.send_head() - r = [] - r.append('') - r.append('') - r.append('Statistics') - r.append('') - r.append('\n') - r.extend(self.fill_body()) - r.append('\n') - r.append('\n') - encoded = '\n'.join(r).encode('UTF-8', 'surrogateescape') - f = io.BytesIO() - f.write(encoded) - f.seek(0) - self.copyfile(f, self.wfile) - f.close() - - def fetch_data(self, command: str, argument: list): - '''Communicate with datastore via queue''' - r = [] - if type(argument) is str: - arg = [] - arg.append(argument) - argument = arg - str_args = [] - for args in argument: - str_args.append(str(args)) - self.server.queue_out.put(command + "\n" + "\n".join(str_args)) +def dump_queue(queue_to_dump: Queue): + '''Write the contents of a queue to a list that we pickle to a file''' + #We use pickle, every entry in the queue is one entry in a list + if queue_to_dump.empty(): + #Nothing to save, nothing to do + logging.debug('Empty queue') + return + out_list = [] + while True: try: - retval = self.server.queue_in.get(timeout=1) + out_list.append(queue_to_dump.get_nowait()) except queue.Empty: - retval = 'query timeout' - logging.debug('Returning: %s', retval) - return str(retval) + break + if out_list: + logging.debug('Saving ' + str(len(out_list)) + ' entries to dump.pickle') + to_save = pickle.dumps(out_list) + with open('dump.pickle', 'wb') as output_file: + bytes_written = output_file.write(to_save) + logging.debug('Saved ' + str(len(out_list)) + ' entries, taking ' + str(bytes_written) + ' bytes') - def fetch_data_for_connections(self, direction: str): - r = [] - r.append('') - r.append('' + direction + '') - for tofetch in (['all'], ['minutes', '1'], ['hours', '1'], ['days', '1'], ['weeks', 1], ['months', '1']): - retval = self.fetch_data(direction, tofetch) - r.append('' + str(retval) + '') - r.append('') - return r +def signal_handler(signum, frame): + '''Handle signals in a sensible way, I guess?''' + if signum == signal.SIGTERM: + logging.critical('Asked to quit') + raise TimeToQuit('Received signal ' + signal.Signals(signum).name) - def fill_body(self): - r = [] - r.append('

Statistics

') - r.append('') - for direction in ('net_dnat', 'loc-net'): - r.extend(self.fetch_data_for_connections(direction)) - r.append('
DirectionAllLast minuteLast hourLast dayLast weekLast month
') - return r +def load_pickled_file(output_queue): + '''Load queue contents from pickled queue structure''' + #Does our dump file exist? + loadfile = 'dump.pickle' + if os.path.isfile(loadfile): + size = os.stat(loadfile).st_size + logging.debug(loadfile + ' exists, loading ' + str(size) + ' bytes.') + #This is already parsed lines, dump them straight into the file_parser_result_queue + #Saved format is [(timestamp, parseresult), ..] + with open(loadfile, 'rb') as input_file: + #Yes, I know, loads is unsafe:) + loaded_data = pickle.loads(input_file.read()) + for entry in loaded_data: + output_queue.put(entry) + logging.debug('Put ' + str(len(loaded_data)) + ' entries on the queue') + logging.debug('Deleting old dump') + os.unlink(loadfile) -def serve_http(httpd_port: int, queue_in, queue_out): - TCPServer.allow_reuse_address = True - with server.ThreadingHTTPServer(("", httpd_port), RequestHandler) as httpd: - httpd.queue_in = queue_in - httpd.queue_out = queue_out - logging.info("serving at port %s", httpd_port) - httpd.serve_forever() +def load_start_pos(logfile): + '''Read start position from file, if it exists''' + #Do we have any position we want to start from? + if os.path.isfile('position'): + with open('position', 'r') as input_file: + tmp_start_pos = input_file.readline() + try: + tmp_start_pos = int(tmp_start_pos) + except ValueError: + logging.error('Could not parse ' + str(tmp_start_pos) + ' as an integer') + return None + logging.debug('Loaded position ' + str(tmp_start_pos)) + size = os.stat(logfile).st_size + logging.debug('log file size is ' + str(size)) + if tmp_start_pos <= size: + return tmp_start_pos + return None + +class RequestHandler(socketserver.BaseRequestHandler): + '''derived BaseRequestHandler''' + def handle(self): + logging.info('Connected to ' + str(self.client_address[0])) + self.request.settimeout(5) + start_time = datetime.now() + while True: + try: + if self.overflowqueue.empty != True: + tosend = self.overflowqueue.get_nowait() + if tosend: + self.send(tosend) + except queue.Empty: + pass + try: + if self.input_queue.empty != True: + event = self.input_queue.get(timeout=1) + tosend = str(event[0]) + ' ' + event[1] + try: + self.send(tosend) + except BrokenPipeError: + logging.error('Client gone') + self.overflowqueue.put(tosend) + break + try: + peer = self.request.getpeername() + except OSError as error: + logging.error('Client gone') + self.overflowqueue.put(tosend) + break + except queue.Empty: + pass + + try: + signal = self.signal_queue.get_nowait() + logging.debug('Signal: ' + str(signal)) + if signal == 'Quit': + logging.info('Asked to quit') + break + except queue.Empty: + pass + + now_time = datetime.now() + diff_time = now_time - start_time + if diff_time.total_seconds() >= 30: + start_time = now_time + #Long time, no see, time to pingpong the client:) + if self.ping_client() != True: + break + logging.debug('Request abandoned') + + def send(self, tosend): + '''Wrap sendall''' + logging.debug('Sending ' + str(tosend)) + self.request.sendall(bytes(tosend + "\n", 'utf-8')) + + def set_queue(self, input_queue, overflowqueue, signal_queue): + '''Set Queue for fetching events''' + self.input_queue = input_queue + self.overflowqueue = overflowqueue + self.signal_queue = signal_queue + + def ping_client(self): + '''Send ping to a client, expect pong back, else close socket''' + try: + self.send('ping') + except BrokenPipeError: + #Someone closed our socket + logging.debug('Broken pipe') + return False + + try: + response = self.request.recv(1024) + if response: + if response.strip() == b'pong': + logging.debug('Client said pong, all good') + return True + else: + logging.debug('No reply') + return False + except TimeoutError: + logging.debug('Timeout') + return False + except ConnectionResetError: + logging.debug('Connection reset, closing socket') + self.request.close() + return False + + except OSError as error: + if str(error) == 'timed out': + #Client probably just slacks + logging.debug('Timeout') + else: + logging.error('Peer gone?: ' + str(error)) + return False + +def socket_server(file_parser_result_queue, overflowqueue, socket_server_signal_queue): + '''Socket server sending whatever data is in the queue to any client connecting''' + #Multiple connections here is probably a horrible idea:) + setproctitle('routerstats-collector socket_server') + host, port = '', 9999 + while True: + try: + socketserver.TCPServer.allow_reuse_address = True + socketserver.TCPServer.allow_reuse_port = True + server = socketserver.TCPServer((host, port), RequestHandler) + server.timeout = 1 + with server: + server.RequestHandlerClass.set_queue(server.RequestHandlerClass, file_parser_result_queue, overflowqueue, socket_server_signal_queue) + logging.info('Socket up at ' + host + ':' + str(port)) + while True: + try: + logging.debug('Waiting for request') + server.handle_request() + except KeyboardInterrupt: + logging.debug('Received KeyboardInterrupt') + try: + server.server_close() + except Exception as e: + logging.exception(e) + return + except ValueError: + #This seems to happen whenever the socket is closed somewhere else, but handle_request still runs + try: + server.server_close() + except Exception as e: + logging.exception(e) + break + try: + recvd_signal = socket_server_signal_queue.get_nowait() + if recvd_signal == 'Quit': + logging.debug('Received Quit') + server.server_close() + return + except queue.Empty: + pass + except OSError as error: + logging.info('Waiting for Address to become available: ' + str(error)) + time.sleep(1) + continue def main(): '''Main thingy''' - httpd_port = 8000 - file_to_follow = '/var/log/ulog/syslogemu.log' - query_queue = Queue() - query_response = Queue() - collector_queue = Queue() - signal_queue = Queue() + setproctitle('routerstats-collector main-thread') + logging.debug('Starting as PID ' + str(os.getpid())) + file_to_follow = 'syslogemu.log' + #Just quit early if file is missing.. + if os.path.isfile(file_to_follow) is False: + logging.error('Could not find file ' + file_to_follow) + sys.exit() + file_parser_result_queue = Queue() + file_parser_signal_queue = Queue() + overflowqueue = Queue() + socket_server_signal_queue = Queue() started_processes = [] + dead_processes = 0 - datastore_process = Process(target=datathingy, args=(query_queue, query_response, collector_queue, signal_queue)) - datastore_process.start() - started_processes.append(datastore_process) + load_pickled_file(file_parser_result_queue) + start_pos = load_start_pos(file_to_follow) - collector_process = Process(target=filefetcher, args=(file_to_follow, collector_queue)) - collector_process.start() - started_processes.append(collector_process) + file_parser_process = Process(target=filefetcher, daemon=True, args=(file_to_follow, file_parser_result_queue, file_parser_signal_queue, 0.5, start_pos)) + file_parser_process.start() + logging.debug('Started filefetcher as pid ' + str(file_parser_process.pid)) + started_processes.append((file_parser_process, file_parser_signal_queue)) - http_process = Process(target=serve_http, args=(httpd_port, query_response, query_queue)) - http_process.start() - started_processes.append(http_process) + #We're not writing directly to an rrd, + #This goes out to a socket + #As soon as a client connects to the socket, we send what we have in queue + #Client can do whatever it wants with this + #This means any "malicious" connections will wipe the history + #We're fine with this + + socket_server_process = Process(target=socket_server, daemon=True, args=(file_parser_result_queue, overflowqueue, socket_server_signal_queue)) + socket_server_process.start() + logging.debug('Socket server started as pid ' + str(socket_server_process.pid)) + started_processes.append((socket_server_process, socket_server_signal_queue)) + + #rrd_stuffer_process = Process(target=rrd_stuffer, args(file_parser_result_queue, rrd_stuffer_signal_queue)) + #rrd_stuffer_process.start() + #started_prcesses.append(rrd_stuffer_process) + #signal_queues.append(rrd_stuffer_signal_queue) + + signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:) #No idea how to manage this better? while True: try: for p in started_processes: - if p.is_alive(): + if p[0].is_alive(): pass + else: + logging.error(p[0].name + ' has died prematurely?') + p[0].join() + dead_processes += 1 + if dead_processes >= len(started_processes): + logging.error('All processes has gone away :/') + sys.exit() time.sleep(0.1) - except KeyboardInterrupt: + except (KeyboardInterrupt, TimeToQuit): for p in started_processes: - if p.is_alive(): - retval = p.join(timeout=1) - if retval is None: - p.kill() + if p[0].is_alive(): + p[1].put('Quit') + p[0].join(timeout=5) + if p[0].is_alive(): + logging.error('Timeout waiting for shutdown, killing child PID: ' + str(p[0].pid)) + p[0].kill() break if __name__ == '__main__':