Files
routerstats/routerstats_collector.py

431 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import signal
import os
import io
import sys
import stat
from multiprocessing import Process, Queue
import socketserver
import threading
import queue
from datetime import datetime, timedelta
import logging
import time
import pickle
from setproctitle import setproctitle
logging.basicConfig(
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
class TimeToQuit(Exception):
'''Used to pass Quit to subthreads'''
pass
class ReloadLog(Exception):
'''Used to reload log file'''
pass
def filefetcher(filename: str, collector_queue, signal_queue, sleep_sec=0.5, seek_pos=None):
'''Latch onto a file, putting any new lines onto the queue.'''
setproctitle('routerstats-collector file-fetcher')
if float(sleep_sec) <= 0.0:
logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)')
return False
while True:
try:
input_file = open(filename, 'r')
except FileNotFoundError:
logging.debug('Retry opening ' + filename)
time.sleep(1)
continue
retries = 0
line = ''
start_stat = os.stat(filename)
if seek_pos is None:
cur_pos = input_file.seek(0, 0)
else:
cur_pos = input_file.seek(seek_pos, io.SEEK_SET)
logging.info('Following ' + filename + ' (inode ' + str(start_stat.st_ino) + ') from pos ' + str(cur_pos))
try:
while True:
tmp = input_file.readline()
if tmp is not None and tmp != '':
line += tmp
if line.endswith("\n"):
line.rstrip()
if line.isspace():
logging.debug('Empty line is empty, thank you for the newline')
else:
logging.debug('Sending line ending at pos ' + str(input_file.tell()))
parse_and_queue_line(line, collector_queue)
line = ''
start_stat = os.stat(filename)
cur_pos = input_file.tell()
else:
logging.debug('readline reported line with no \n?')
else:
#Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:)
any_signal = got_signal(signal_queue, sleep_sec)
if any_signal == 'Quit':
shutdown_filefetcher(collector_queue, input_file)
logging.critical('Shutdown filefetcher')
return True
now_stat = os.stat(filename)
if now_stat.st_ino != start_stat.st_ino:
if now_stat.st_ctime == start_stat.st_ctime:
#Strange, inode has changed, but ctime is the same?
if now_stat.st_size >= start_stat.st_size:
logging.warning('New inode number, but same ctime? Not sure how to handle this. Reopening, but keeping seek position...')
else:
logging.warning('New inode number, same ctime, but smaller? Much confuse, starting from beginning..')
seek_pos = 0
else:
logging.debug('File have new inode number, restarting read from start')
seek_pos = 0
break
if now_stat.st_size < start_stat.st_size:
logging.debug('File is now smaller than last read, restarting from start')
seek_pos = 0
break
except KeyboardInterrupt:
shutdown_filefetcher(collector_queue, input_file)
logging.debug('KeyboardInterrupt, closing file and quitting')
return False
except FileNotFoundError:
'''Input file gone-gone during loop, retry opening a few times'''
logging.debug('File gone away')
next
def shutdown_filefetcher(output_queue, input_file):
'''Cleanly close filehandles, save log position and queue contents'''
cur_pos = input_file.tell()
input_file.close()
with open('position', 'w') as output_file:
logging.debug('Saving current position ' + str(cur_pos))
output_file.write(str(cur_pos))
dump_queue(output_queue)
def got_signal(signal_queue: Queue, sleep_sec: float):
'''Read from signal_queue with a timeout of sleep_sec,
returns either a signal name (whatever text string the queue gave us), or None'''
try:
any_signal = signal_queue.get(timeout=sleep_sec)
logging.critical('Got ' + any_signal)
return any_signal
except queue.Empty:
return None
def parse_and_queue_line(line: str, collector_queue):
'''Parse and queue the result'''
to_ret = parse_line(line)
if to_ret:
collector_queue.put(to_ret)
def parse_line(input_line: str) -> dict:
'''Fetch fourth space-separated item as zone, prepend with unix timestamp from the 3 first items'''
#Input line: Month DayOfMonth HH:MM:SS hostname zone....
#The input line is in the local timezone
#Seems like python known about our timezone, so when converting to unix timestamp from random text
#it should handle changes from CET to CEST
try:
input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this?
space_split = input_line.split(' ')
month = space_split[0]
dateint = space_split[1]
timestr = space_split[2]
zone = space_split[4]
now = datetime.now()
except IndexError:
return None
logline_time = datetime.strptime(str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr), '%Y %b %d %H:%M:%S')
#If this is in the future, this probably means the data is from last year
if logline_time > now:
#We're in the future. Prettu sure the future is not yet, we're in last year
#This might happen around first of january
logline_time = logline_time.replace(year=logline_time.year - 1)
timestamp = int(logline_time.timestamp())
retval = (timestamp, zone)
logging.debug('Parsed line to ' + str(retval))
return retval
def dump_queue(queue_to_dump: Queue):
'''Write the contents of a queue to a list that we pickle to a file'''
#We use pickle, every entry in the queue is one entry in a list
if queue_to_dump.empty():
#Nothing to save, nothing to do
logging.debug('Empty queue')
return
out_list = []
while True:
try:
out_list.append(queue_to_dump.get_nowait())
except queue.Empty:
break
if out_list:
logging.debug('Saving ' + str(len(out_list)) + ' entries to dump.pickle')
to_save = pickle.dumps(out_list)
with open('dump.pickle', 'wb') as output_file:
bytes_written = output_file.write(to_save)
logging.debug('Saved ' + str(len(out_list)) + ' entries, taking ' + str(bytes_written) + ' bytes')
def signal_handler(signum, frame):
'''Handle signals in a sensible way, I guess?'''
if signum == signal.SIGTERM:
logging.critical('Asked to quit')
raise TimeToQuit('Received signal ' + signal.Signals(signum).name)
def load_pickled_file(output_queue):
'''Load queue contents from pickled queue structure'''
#Does our dump file exist?
loadfile = 'dump.pickle'
if os.path.isfile(loadfile):
size = os.stat(loadfile).st_size
logging.debug(loadfile + ' exists, loading ' + str(size) + ' bytes.')
#This is already parsed lines, dump them straight into the file_parser_result_queue
#Saved format is [(timestamp, parseresult), ..]
with open(loadfile, 'rb') as input_file:
#Yes, I know, loads is unsafe:)
loaded_data = pickle.loads(input_file.read())
for entry in loaded_data:
output_queue.put(entry)
logging.debug('Put ' + str(len(loaded_data)) + ' entries on the queue')
logging.debug('Deleting old dump')
os.unlink(loadfile)
def load_start_pos(logfile):
'''Read start position from file, if it exists'''
#Do we have any position we want to start from?
if os.path.isfile('position'):
with open('position', 'r') as input_file:
tmp_start_pos = input_file.readline()
try:
tmp_start_pos = int(tmp_start_pos)
except ValueError:
logging.error('Could not parse ' + str(tmp_start_pos) + ' as an integer')
return None
logging.debug('Loaded position ' + str(tmp_start_pos))
size = os.stat(logfile).st_size
logging.debug('log file size is ' + str(size))
if tmp_start_pos <= size:
return tmp_start_pos
return None
class RequestHandler(socketserver.BaseRequestHandler):
'''derived BaseRequestHandler'''
def handle(self):
logging.info('Connected to ' + str(self.client_address[0]))
self.request.settimeout(5)
start_time = datetime.now()
while True:
try:
if self.overflowqueue.empty != True:
tosend = self.overflowqueue.get_nowait()
if tosend:
self.send(tosend)
except queue.Empty:
pass
try:
if self.input_queue.empty != True:
event = self.input_queue.get(timeout=1)
tosend = str(event[0]) + ' ' + event[1]
try:
self.send(tosend)
except BrokenPipeError:
logging.error('Client gone')
self.overflowqueue.put(tosend)
break
try:
peer = self.request.getpeername()
except OSError as error:
logging.error('Client gone')
self.overflowqueue.put(tosend)
break
except queue.Empty:
pass
try:
signal = self.signal_queue.get_nowait()
logging.debug('Signal: ' + str(signal))
if signal == 'Quit':
logging.info('Asked to quit')
break
except queue.Empty:
pass
now_time = datetime.now()
diff_time = now_time - start_time
if diff_time.total_seconds() >= 30:
start_time = now_time
#Long time, no see, time to pingpong the client:)
if self.ping_client() != True:
break
logging.debug('Request abandoned')
def send(self, tosend):
'''Wrap sendall'''
logging.debug('Sending ' + str(tosend))
self.request.sendall(bytes(tosend + "\n", 'utf-8'))
def set_queue(self, input_queue, overflowqueue, signal_queue):
'''Set Queue for fetching events'''
self.input_queue = input_queue
self.overflowqueue = overflowqueue
self.signal_queue = signal_queue
def ping_client(self):
'''Send ping to a client, expect pong back, else close socket'''
try:
self.send('ping')
except BrokenPipeError:
#Someone closed our socket
logging.debug('Broken pipe')
return False
try:
response = self.request.recv(1024)
if response:
if response.strip() == b'pong':
logging.debug('Client said pong, all good')
return True
else:
logging.debug('No reply')
return False
except TimeoutError:
logging.debug('Timeout')
return False
except ConnectionResetError:
logging.debug('Connection reset, closing socket')
self.request.close()
return False
except OSError as error:
if str(error) == 'timed out':
#Client probably just slacks
logging.debug('Timeout')
else:
logging.error('Peer gone?: ' + str(error))
return False
def socket_server(file_parser_result_queue, overflowqueue, socket_server_signal_queue):
'''Socket server sending whatever data is in the queue to any client connecting'''
#Multiple connections here is probably a horrible idea:)
setproctitle('routerstats-collector socket_server')
host, port = '', 9999
while True:
try:
socketserver.TCPServer.allow_reuse_address = True
socketserver.TCPServer.allow_reuse_port = True
server = socketserver.TCPServer((host, port), RequestHandler)
server.timeout = 1
with server:
server.RequestHandlerClass.set_queue(server.RequestHandlerClass, file_parser_result_queue, overflowqueue, socket_server_signal_queue)
logging.info('Socket up at ' + host + ':' + str(port))
while True:
try:
logging.debug('Waiting for request')
server.handle_request()
except KeyboardInterrupt:
logging.debug('Received KeyboardInterrupt')
try:
server.server_close()
except Exception as e:
logging.exception(e)
return
except ValueError:
#This seems to happen whenever the socket is closed somewhere else, but handle_request still runs
try:
server.server_close()
except Exception as e:
logging.exception(e)
break
try:
recvd_signal = socket_server_signal_queue.get_nowait()
if recvd_signal == 'Quit':
logging.debug('Received Quit')
server.server_close()
return
except queue.Empty:
pass
except OSError as error:
logging.info('Waiting for Address to become available: ' + str(error))
time.sleep(1)
continue
def main():
'''Main thingy'''
setproctitle('routerstats-collector main-thread')
logging.debug('Starting as PID ' + str(os.getpid()))
file_to_follow = 'syslogemu.log'
#Just quit early if file is missing..
if os.path.isfile(file_to_follow) is False:
logging.error('Could not find file ' + file_to_follow)
sys.exit()
file_parser_result_queue = Queue()
file_parser_signal_queue = Queue()
overflowqueue = Queue()
socket_server_signal_queue = Queue()
started_processes = []
dead_processes = 0
load_pickled_file(file_parser_result_queue)
start_pos = load_start_pos(file_to_follow)
file_parser_process = Process(target=filefetcher, daemon=True, args=(file_to_follow, file_parser_result_queue, file_parser_signal_queue, 0.5, start_pos))
file_parser_process.start()
logging.debug('Started filefetcher as pid ' + str(file_parser_process.pid))
started_processes.append((file_parser_process, file_parser_signal_queue))
#We're not writing directly to an rrd,
#This goes out to a socket
#As soon as a client connects to the socket, we send what we have in queue
#Client can do whatever it wants with this
#This means any "malicious" connections will wipe the history
#We're fine with this
socket_server_process = Process(target=socket_server, daemon=True, args=(file_parser_result_queue, overflowqueue, socket_server_signal_queue))
socket_server_process.start()
logging.debug('Socket server started as pid ' + str(socket_server_process.pid))
started_processes.append((socket_server_process, socket_server_signal_queue))
#rrd_stuffer_process = Process(target=rrd_stuffer, args(file_parser_result_queue, rrd_stuffer_signal_queue))
#rrd_stuffer_process.start()
#started_prcesses.append(rrd_stuffer_process)
#signal_queues.append(rrd_stuffer_signal_queue)
signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:)
#No idea how to manage this better?
while True:
try:
for p in started_processes:
if p[0].is_alive():
pass
else:
logging.error(p[0].name + ' has died prematurely?')
p[0].join()
dead_processes += 1
if dead_processes >= len(started_processes):
logging.error('All processes has gone away :/')
sys.exit()
time.sleep(0.1)
except (KeyboardInterrupt, TimeToQuit):
for p in started_processes:
if p[0].is_alive():
p[1].put('Quit')
p[0].join(timeout=5)
if p[0].is_alive():
logging.error('Timeout waiting for shutdown, killing child PID: ' + str(p[0].pid))
p[0].kill()
break
if __name__ == '__main__':
main()