#!/usr/bin/env python3 '''Collect all the things''' import signal import os import io import sys from multiprocessing import Process, Queue import socketserver import queue from datetime import datetime import logging import time import pickle import configparser import argparse from setproctitle import setproctitle logging.basicConfig( format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') class TimeToQuit(Exception): '''Used to pass Quit to subthreads''' class ReloadLog(Exception): '''Used to reload log file''' def wait_for_file(filename: str): '''Busy loop waiting for file to exist, return return from open when file is opened''' loops = 0 logging.debug('Trying to open file %s', filename) while True: try: input_file = open(filename, 'r', encoding='utf-8') return input_file except FileNotFoundError: logging.debug('%s not found, sleeping and retrying', filename) loops += 1 loops = min(loops, 10) time.sleep(loops) return None def fetch_line_from_file(filehandle): '''Loop until we have formed a complete line from filehandle''' line = '' while True: tmp = filehandle.readline() if tmp is not None and tmp != '': line += tmp if line.endswith("\n"): line.rstrip() if line.isspace(): logging.debug('Empty line is empty, thank you for the newline') else: return line line = '' else: logging.debug('readline reported line with no \n?') else: #Either readline returned None, or '' return None def filefetcher( filename: str, output_directory: str, collector_queue: Queue, signal_queue: Queue, sleep_sec=0.5, seek_pos=None): '''Latch onto a file, putting any new lines onto the queue.''' setproctitle('routerstats-collector file-fetcher') if float(sleep_sec) <= 0.0: logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)') return False while True: try: input_file = wait_for_file(filename) if not input_file: #Something is very wrong, the above function should only return #whenever it has opened a file logging.critical('Failed to open %s', filename) break start_stat = os.stat(filename) if seek_pos is None: cur_pos = input_file.seek(0, 0) else: cur_pos = input_file.seek(seek_pos, io.SEEK_SET) logging.info( 'Following %s (inode %s) from pos %s', filename, start_stat.st_ino, cur_pos) while True: line = fetch_line_from_file(input_file) if line: logging.debug('Parsing line ending at pos %s', input_file.tell()) parse_and_queue_line(line, collector_queue) start_stat = os.stat(filename) cur_pos = input_file.tell() else: #Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:) any_signal = got_signal(signal_queue, sleep_sec) if any_signal == 'Quit': shutdown_filefetcher(collector_queue, input_file, output_directory) logging.critical('Shutdown filefetcher') return True now_stat = os.stat(filename) if now_stat.st_ino != start_stat.st_ino: #Inode has changed #implicit break at the bottom #the following if definitions are just to see if we want to reset seek_pos or not if now_stat.st_ctime == start_stat.st_ctime: #Strange, inode has changed, but ctime (creation_time) is the same? if now_stat.st_size >= start_stat.st_size: #Size is greater than or equal to last round logging.warning( 'New inode number, but same ctime?' 'Not sure how to handle this. Reopening, but keeping seek position...') else: logging.warning( 'New inode number, same ctime, but smaller? Much confuse, starting from beginning..') seek_pos = 0 else: logging.debug('File have new inode number, restarting read from start') seek_pos = 0 break if now_stat.st_size < start_stat.st_size: logging.debug('File is now smaller than last read, restarting from start') seek_pos = 0 break except KeyboardInterrupt: shutdown_filefetcher(collector_queue, input_file, output_directory) logging.debug('KeyboardInterrupt, closing file and quitting') return False except FileNotFoundError: logging.debug('File gone away') #Save progress, and hope it returns some day:) seek_pos = save_current_pos(input_file, output_directory) def save_current_pos(input_file, output_directory): '''As it say on the tin''' cur_pos = input_file.tell() input_file.close() with open(output_directory + 'position', 'w', encoding='utf-8') as output_file: logging.debug('Saving current position %s to %s', cur_pos, output_directory + 'position') output_file.write(str(cur_pos)) return cur_pos def shutdown_filefetcher(output_queue, input_file, output_directory): '''Cleanly close filehandles, save log position and queue contents''' save_current_pos(input_file, output_directory) dump_queue(output_queue, output_directory + 'dump.pickle') def got_signal(signal_queue: Queue, sleep_sec: float): '''Read from signal_queue with a timeout of sleep_sec, returns either a signal name (whatever text string the queue gave us), or None''' try: any_signal = signal_queue.get(timeout=sleep_sec) logging.critical('Got %s', any_signal) return any_signal except queue.Empty: return None def parse_and_queue_line(line: str, collector_queue): '''Parse and queue the result''' to_ret = parse_line(line) if to_ret: collector_queue.put(to_ret) def parse_line(input_line: str) -> dict: '''Fetch fourth space-separated item as zone, prepend with unix timestamp from the 3 first items''' #Input line: Month DayOfMonth HH:MM:SS hostname zone.... #The input line is in the local timezone #Seems like python known about our timezone, so when converting to unix timestamp from random text #it should handle changes from CET to CEST try: input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this? space_split = input_line.split() month = space_split[0] dateint = space_split[1] timestr = space_split[2] zone = space_split[4] now = datetime.now() except IndexError: return None try: logline_time = datetime.strptime( str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr), '%Y %b %d %H:%M:%S') except ValueError: logging.error('Could not parse line %s', input_line) return None #If this is in the future, this probably means the data is from last year if logline_time > now: #We're in the future. Prettu sure the future is not yet, we're in last year #This might happen around first of january logline_time = logline_time.replace(year=logline_time.year - 1) timestamp = int(logline_time.timestamp()) #FIXME Make this configurable somehow if zone in ['loc-net', 'router-net']: zone = 'loc-net' retval = (timestamp, zone) logging.debug('Parsed line to %s', retval) return retval def dump_queue(queue_to_dump: Queue, dumpfile): '''Write the contents of a queue to a list that we pickle to a file''' #We use pickle, every entry in the queue is one entry in a list if queue_to_dump.empty(): #Nothing to save, nothing to do logging.debug('Empty queue') return out_list = [] while True: try: out_list.append(queue_to_dump.get_nowait()) except queue.Empty: break if out_list: logging.debug('Saving %s entries to %s', len(out_list), dumpfile) to_save = pickle.dumps(out_list) with open(dumpfile, 'wb') as output_file: bytes_written = output_file.write(to_save) logging.debug('Saved %s entries, taking %s bytes', len(out_list), bytes_written) def signal_handler(signum, _): '''Handle signals in a sensible way, I guess?''' if signum == signal.SIGTERM: logging.critical('Asked to quit') raise TimeToQuit('Received signal ' + signal.Signals(signum).name) def load_pickled_file(output_queue, loadfile): '''Load queue contents from pickled queue structure''' #Does our dump file exist? if os.path.isfile(loadfile): size = os.stat(loadfile).st_size logging.debug('%s exists, loading %s bytes.', loadfile, size) #This is already parsed lines, dump them straight into the file_parser_result_queue #Saved format is [(timestamp, parseresult), ..] with open(loadfile, 'rb') as input_file: #Yes, I know, loads is unsafe:) loaded_data = pickle.loads(input_file.read()) for entry in loaded_data: output_queue.put(entry) logging.debug('Put %s entries on the queue', len(loaded_data)) logging.debug('Deleting old dump') os.unlink(loadfile) def load_start_pos(logfile, position_file): '''Read start position from file, if it exists''' #Do we have any position we want to start from? if os.path.isfile(position_file): with open(position_file, 'r', encoding='utf-8') as input_file: tmp_start_pos = input_file.readline() try: tmp_start_pos = int(tmp_start_pos) except ValueError: logging.error('Could not parse %s as an integer', tmp_start_pos) return None logging.debug('Loaded position %s', tmp_start_pos) size = os.stat(logfile).st_size logging.debug('log file size is %s', size) if tmp_start_pos <= size: return tmp_start_pos return None class RequestHandler(socketserver.BaseRequestHandler): '''derived BaseRequestHandler''' def set_passwd_file(self, filename): '''Make us able to set attributes on derived classes''' self.passwd_file = filename #pylint: disable=attribute-defined-outside-init def check_login(self, answer): '''Check if what client say is the password matches against passwd_file contents''' with open(self.passwd_file, 'r', encoding='utf-8') as passwd_file: passwd = passwd_file.readline() passwd = passwd.rstrip() #Remove that newline try: answer = answer.decode('utf-8') except UnicodeDecodeError as error: logging.error('Could not decode %s as unicode: %s', answer, str(error)) if answer == passwd: return True return False def login(self): '''Run login procedure''' try: self.request.send(b'Hello') try: answer = self.request.recv(1024) except TimeoutError: #Client did not even bother to reply... logging.warning('Timed out during auth') self.request.send(b'timeout') return None if not self.check_login(answer): logging.warning('Wrong passphrase') self.request.send(b'auth error') return None self.request.send(b'Welcome') logging.info('Client %s logged in', self.client_address[0]) return True except BrokenPipeError: #Client gone and came back, bad idea. logging.warning('Broken pipe, closing socket') return False except ConnectionResetError: #Other end closed socket, we're ok logging.warning('Connection reset by peer') return False return None def handle(self): logging.info('Connected to %s', self.client_address[0]) self.sendt_lines = 0 self.request.settimeout(5) start_time = datetime.now() if not self.login(): self.request.close() return while True: try: if self.overflowqueue.empty is not True: while True: tosend = self.overflowqueue.get_nowait() if tosend: logging.debug('Sending %s from overflowqueue', tosend) self.send(tosend) except queue.Empty: pass try: if self.input_queue.empty is not True: event = self.input_queue.get(timeout=1) tosend = str(event[0]) + ' ' + event[1] try: self.send(tosend) except (BrokenPipeError, ConnectionResetError, TimeoutError) as error: logging.error('Client gone: %s', error) self.overflowqueue.put(tosend) break try: _ = self.request.getpeername() except OSError: logging.error('Client gone') self.overflowqueue.put(tosend) break start_time = datetime.now() except queue.Empty: pass try: rcvd_signal = self.signal_queue.get_nowait() logging.debug('Signal: %s', rcvd_signal) if rcvd_signal == 'Quit': logging.info('Asked to quit') break except queue.Empty: pass now_time = datetime.now() diff_time = now_time - start_time if diff_time.total_seconds() >= 30: start_time = now_time #Long time, no see, time to pingpong the client:) if self.ping_client() is not True: break logging.info('Request abandoned') def send(self, tosend): '''Wrap sendall''' logging.debug('Sending %s', tosend) self.request.sendall(bytes(tosend + "\n", 'utf-8')) self.sendt_lines += 1 if not self.sendt_lines % 1000: logging.info('Sendt %s lines', self.sendt_lines) def set_queue(self, input_queue, overflowqueue, signal_queue): '''Set Queue for fetching events''' self.input_queue = input_queue #pylint: disable=attribute-defined-outside-init self.overflowqueue = overflowqueue #pylint: disable=attribute-defined-outside-init self.signal_queue = signal_queue #pylint: disable=attribute-defined-outside-init def ping_client(self): '''Send ping to a client, expect pong back, else close socket''' try: self.send('ping') except BrokenPipeError: #Someone closed our socket logging.debug('Broken pipe') return False try: response = self.request.recv(1024) if response: if response.strip() == b'pong': logging.debug('Client said pong, all good') return True else: logging.debug('No reply') except TimeoutError: logging.debug('Timeout') except ConnectionResetError: logging.debug('Connection reset, closing socket') self.request.close() except OSError as error: if str(error) == 'timed out': #Client probably just slacks logging.debug('Timeout') else: logging.error('Peer gone?: %s', error) return False def socket_server( file_parser_result_queue, overflowqueue, socket_server_signal_queue, passwd_file, server_port): '''Socket server sending whatever data is in the queue to any client connecting''' #Multiple connections here is probably a horrible idea:) setproctitle('routerstats-collector socket_server') host, port = '', server_port while True: try: socketserver.TCPServer.allow_reuse_address = True socketserver.TCPServer.allow_reuse_port = True server = socketserver.TCPServer((host, port), RequestHandler) server.timeout = 1 with server: server.RequestHandlerClass.set_queue( server.RequestHandlerClass, file_parser_result_queue, overflowqueue, socket_server_signal_queue) server.RequestHandlerClass.set_passwd_file(server.RequestHandlerClass, passwd_file) logging.info('Socket up at %s:%s', host, port) while True: try: server.handle_request() except KeyboardInterrupt: logging.debug('Received KeyboardInterrupt') server.server_close() return except ValueError: #This seems to happen whenever the socket is closed somewhere else, #but handle_request still runs server.server_close() break try: recvd_signal = socket_server_signal_queue.get_nowait() if recvd_signal == 'Quit': logging.warning('Shutting down socketserver') server.server_close() return except queue.Empty: pass except OSError as error: logging.info('Waiting for Address to become available: %s', error) time.sleep(1) continue def main(): '''Main thingy''' config_section = 'collector' setproctitle('routerstats-collector main-thread') config = configparser.ConfigParser() parser = argparse.ArgumentParser(exit_on_error=False, add_help=False) parser.add_argument('-c', '--config', help='config file to load') parser.add_argument('-d', '--debug', action='store_true', help='enable debug') args, _ = parser.parse_known_args() if args.debug: logging.root.setLevel(logging.DEBUG) logging.debug('Starting as PID %s' ,os.getpid()) found = False config_dirs = ('/etc/routerstats/', '/usr/local/etc/routerstats/', '/opt/routerstats/', './') if args.config: if os.path.isfile(args.config): config.read(args.config) found = True else: logging.error('Specified config file does not exist: %s', args.config) else: logging.debug('Trying to find config') #Try to find in "usual" places for directory in config_dirs: trytoread = directory + 'routerstats.config' if os.path.isfile(trytoread): logging.debug('Reading config file %s', trytoread) config.read(trytoread) found = True if not found: logging.error('routerstats.config not found in %s', config_dirs) sys.exit(0) parser.add_argument( '-f', '--file', dest='file_to_follow', help='Log file to follow', default=config[config_section]['logfile']) parser.add_argument( '-w', '--pwdfile', dest='passwd_file', help='password file', default=config[config_section]['passwd_file']) parser.add_argument( '-p', '--port', dest='server_port', type=int, help='tcp port to listen to', default=config[config_section]['port']) parser.add_argument( '-v', '--vardir', dest='var_dir', help='Location for queue dumps', default=config[config_section]['var_dir']) parser.add_argument( '-h', '--help', help='show this help and exit', action='store_true', default=False ) args = parser.parse_args() logging.debug(args) if args.help: parser.print_help() sys.exit() #Just quit early if file is missing.. if os.path.isfile(args.file_to_follow) is False: logging.error('Could not find log file %s', args.file_to_follow) sys.exit() if not os.path.isfile(args.passwd_file): logging.error('Could not find password file %s', args.passwd_file) sys.exit() if not args.server_port: logging.error('No TPC port to bind to') sys.exit() if not os.path.isdir(args.var_dir): logging.error('Could not find var dir %s', args.var_dir) sys.exit() file_parser_result_queue = Queue() file_parser_signal_queue = Queue() overflowqueue = Queue() socket_server_signal_queue = Queue() started_processes = [] load_pickled_file(file_parser_result_queue, args.var_dir + '/dump.pickle') start_pos = load_start_pos(args.file_to_follow, args.var_dir + '/position') file_parser_process = Process( target=filefetcher, daemon=True, args=( args.file_to_follow, args.var_dir, file_parser_result_queue, file_parser_signal_queue, 0.5, start_pos)) file_parser_process.start() logging.debug('Started filefetcher as pid %s', file_parser_process.pid) started_processes.append((file_parser_process, file_parser_signal_queue)) #We're not writing directly to an rrd, #This goes out to a socket #As soon as a client connects to the socket, we send what we have in queue #Client can do whatever it wants with this #This means any "malicious" connections will wipe the history #We're fine with this socket_server_process = Process( target=socket_server, daemon=True, args=( file_parser_result_queue, overflowqueue, socket_server_signal_queue, args.passwd_file, args.server_port)) socket_server_process.start() logging.debug('Socket server started as pid %s', socket_server_process.pid) started_processes.append((socket_server_process, socket_server_signal_queue)) signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:) signal.signal(signal.SIGINT, signal_handler) #No idea how to manage this better? while True: try: for p in started_processes: if p[0].is_alive(): pass else: logging.error('%s has died prematurely?', p[0].name) p[0].join() raise TimeToQuit time.sleep(0.5) except (KeyboardInterrupt, TimeToQuit): logging.warning('Asked to quit via exception TimeToQuit') for p in started_processes: if p[0].is_alive(): p[1].put('Quit') p[0].join(timeout=5) if p[0].is_alive(): logging.error('Timeout waiting for shutdown, killing child PID: %s', p[0].pid) p[0].kill() break if __name__ == '__main__': main()