Move things into directories making more sense for packaging

This commit is contained in:
2024-07-07 16:44:04 +02:00
parent e6f5b7bfa6
commit c0e13f9c8d
4 changed files with 1245 additions and 0 deletions

View File

@ -0,0 +1,17 @@
[DEFAULT]
passwd_file = /usr/local/www/routerstats/passwd.client
rrd_file = /usr/local/www/routerstats/test.rrd
var_dir = /var/cache/routerstats/
port = 9999
[client]
host = remoteserver
[socket_server]
[httpd]
port = 8000
[collector]
logfile = /var/log/ulog/syslogemu.log

399
usr/bin/routerstats_client.py Executable file
View File

@ -0,0 +1,399 @@
#!/usr/bin/env python3
'''Client for the routerstats collection'''
import socket
import logging
import time
import os
import sys
import argparse
import configparser
import rrdtool
logging.basicConfig(
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
class NotConnected(Exception):
'''Raise to make it known that you are not connected'''
class RouterstatsClient():
'''Client itself'''
def __init__(self, host, port, passwd_file):
self.host = host
self.port = port
self.connected = False
self.sock = None
self.passwd_file = passwd_file
self.received_lines = 0
def connect(self):
'''Do connect'''
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.host, self.port))
self.sock.settimeout(1)
logging.info('Connected to %s', self.host)
self.login()
self.connected = True
self.received_lines = 0
except ConnectionRefusedError as error:
logging.error('Could not connect to %s:%s', self.host, self.port)
raise ConnectionRefusedError from error
except ConnectionError as error:
logging.error('Could not connect to %s:%s: %s',
self.host,
self.port,
error)
except TimeoutError as error:
logging.error('Timed out')
def login(self):
'''Do the login routine.
Wait for server to greet us with "hello"
reply with the password
wait for "Welcome"
'''
logging.debug('Logging in')
try:
hello = self.sock.recv(5)
except TimeoutError as exception:
logging.error('Timeout while waiting for Hello')
raise ConnectionError('Timed out waiting for Hello when connecting') from exception
if not hello == b'Hello':
logging.error('No Hello from server: %s', hello)
raise ConnectionError('Server did not greet us with Hello during login')
with open(self.passwd_file, 'r', encoding='utf-8') as passwd_file:
passwd = passwd_file.readline()
passwd = passwd.rstrip()
logging.debug('Sending password: %s', passwd)
self.sock.send(passwd.encode('utf-8'))
try:
response = self.sock.recv(7)
if not response == b'Welcome':
logging.error('Not Welcome: %s', response)
raise ConnectionError('We are not greeted with Welcome after sending password')
return True
except TimeoutError as exception:
raise ConnectionError(
'Timed out while waiting for server to greet us after sending password') from exception
def send(self, tosend):
'''Send some data'''
if self.connected:
logging.debug('Sending %s', tosend)
send_bytes = bytes(tosend + "\n", 'utf-8')
try:
self.sock.send(send_bytes)
except OSError as error:
logging.error('Cound not send to server: %s', error)
self.connected = False
raise NotConnected from error
try:
self.sock.getpeername()
except OSError as error:
logging.error('Could not send to server: %s', error)
self.connected = False
raise NotConnected from error
else:
logging.error('Not connected to server')
raise NotConnected('Not connected to server')
def recv(self):
'''Receive some data'''
if self.connected is not True:
logging.error('Trying to recv when not connected')
raise NotConnected
line = b''
blanks = 0
while True:
try:
toret = self.sock.recv(1)
if toret:
line += toret
if blanks > 0:
blanks -= 1
if line.endswith(b'\n'):
#We're done for now, returning value
line = line.strip().decode('utf-8')
logging.debug('Received from server: %s', line)
self.received_lines += 1
if not self.received_lines % 1000:
logging.info('Received %s lines', self.received_lines)
return line
else:
blanks += 1
if blanks >= 5:
blanks = 0
# break
logging.debug('Too many blank reads, and still no complete line')
self.connected = False
raise NotConnected
except TimeoutError:
if line == b'':
break
logging.error('Timeout while fetching data, got: %s' ,line)
break
except OSError as error:
if str(error) == 'timed out':
#This is expected, as it is how we loop
break
logging.error('OSError: %s', error)
logging.debug('Got: %s', line)
self.connected = False
raise NotConnected(error) from error
return None
def handle_server_msg(received: str):
'''Do something about something from the server'''
try:
timestamp, zone = received.split()
except ValueError:
logging.error('Could not parse %s into two distinct values', received)
return None
try:
timestamp = int(timestamp)
except ValueError:
logging.error('Could not parse %s as an int', timestamp)
return None
toret = {'timestamp': timestamp, 'net_dnat': 0, 'loc-net': 0}
try:
toret[zone] += 1
except KeyError as error:
logging.debug('Ignoring zone: %s', error)
logging.debug('Parsed to: %s', toret)
return toret
def handle(received, client):
'''Do things with whatever came from the server'''
if not received:
return None
if received == 'ping':
client.send('pong')
return True
if received is not None:
if received:
return handle_server_msg(received)
return None
class UpdateRRD:
'''Handle updates to the rrd-file, since we cannot update more than once every second'''
def __init__(self, rrdfile):
self.rrdfile = rrdfile
if os.path.isfile(rrdfile) is not True:
self.create()
self.toupdate = {'timestamp': None, 'net_dnat': 0, 'loc-net': 0}
self.freshdict = self.toupdate.copy()
def create(self):
'''Create self.rrdfile'''
rrdtool.create(
self.rrdfile,
"--start",
"1000000000",
"--step",
"10",
"DS:net_dnat:ABSOLUTE:10:0:U",
"DS:loc-net:ABSOLUTE:10:0:U",
"RRA:AVERAGE:0.5:1:1d",
"RRA:AVERAGE:0.5:1:1M")
logging.debug('Created rrdfile %s', self.rrdfile)
def push(self):
'''Send data to rrdtool'''
#Make sure we're not doing something stupid..
info = rrdtool.info(self.rrdfile)
if self.toupdate['timestamp'] is None:
return False
if info['last_update'] > self.toupdate['timestamp']:
logging.error(
'Trying to update when rrdfile is newer than our timestamp. Ignoring line and resetting.')
self.toupdate = self.freshdict.copy()
return False
if info['last_update'] == self.toupdate['timestamp']:
logging.error('last update and toupdate timestamp are the same, this should not happen')
self.toupdate = self.freshdict.copy()
return False
try:
rrdtool.update(
self.rrdfile,
str(self.toupdate['timestamp'])
+ ':'
+ str(self.toupdate['net_dnat'])
+ ':'
+ str(self.toupdate['loc-net']))
self.toupdate = self.freshdict.copy()
logging.debug('Updated rrdfile')
return True
except rrdtool.OperationalError as error:
if str(error) == 'could not lock RRD':
return False
logging.error(str(error))
return False
def add(self, input_dict):
'''Add data to be added later'''
if self.toupdate['timestamp'] is None:
#Never touched, just overwrite with whatever we got
self.toupdate = input_dict
elif input_dict['timestamp'] > self.toupdate['timestamp']:
#What we get is fresher than what we have, and noone has done a push yet
self.push()
self.toupdate = input_dict
elif input_dict['timestamp'] == self.toupdate['timestamp']:
#Same timestamp, just up values and be happy
self.toupdate['net_dnat'] += input_dict['net_dnat']
self.toupdate['loc-net'] += input_dict['loc-net']
elif input_dict['timestamp'] < self.toupdate['timestamp']:
diff = self.toupdate['timestamp'] - input_dict['timestamp']
if diff <= 5:
#Might be because something is a bit slow sometimes?
input_dict['timestamp'] = self.toupdate['timestamp']
else:
logging.error('Newly fetched data is older than what we have in the queue already. Passing.')
else:
logging.error('Not sure what to do here? %s: %s', input_dict, self.toupdate)
def initial_connect(client):
'''Loop here until first connection is made'''
while True:
try:
client.connect()
break
except ConnectionRefusedError:
time.sleep(1)
def loop(client, rrdupdater):
'''Main loop'''
initial_connect(client)
tries = 0
loops = 0
while True:
try:
retval = handle(client.recv(), client)
if retval:
loops = 0
if retval is True:
pass
else:
rrdupdater.add(retval)
else:
loops += 1
if loops >= 5:
#Want to wait until at least 5 secs have passed since last actual data fetch..
rrdupdater.push()
if loops >= 60:
logging.error(
'No data in 60 seconds. We expect a ping/pong every 30. Lost connection, probably')
loops = 0
raise NotConnected
except NotConnected:
try:
client.connect()
tries = 0
except ConnectionRefusedError as error:
logging.debug('%s', error)
tries += 1
tries = min(tries, 5)
time.sleep(tries)
except ConnectionRefusedError as error:
logging.debug('%s', error)
time.sleep(1)
except KeyboardInterrupt:
break
def main():
'''Main is main'''
config_section = 'client'
config = configparser.ConfigParser()
parser = argparse.ArgumentParser(exit_on_error=False, add_help=False)
parser.add_argument('-c', '--config', help='config file to load')
parser.add_argument('-d', '--debug', action='store_true', help='enable debug')
args, _ = parser.parse_known_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logging.debug('Starting as PID %s', os.getpid())
found = False
config_dirs = ('/etc/routerstats/', '/usr/local/etc/routerstats/', '/opt/routerstats/', './')
if args.config:
if os.path.isfile(args.config):
config.read(args.config)
found = True
else:
logging.error('Specified config file does not exist: %s', args.config)
else:
logging.debug('Trying to find config')
#Try to find in "usual" places
for directory in config_dirs:
trytoread = directory + 'routerstats.config'
if os.path.isfile(trytoread):
logging.debug('Reading config file %s', trytoread)
config.read(trytoread)
found = True
if not found:
logging.error('routerstats.config not found in %s', config_dirs)
sys.exit(0)
parser.add_argument(
'-r',
'--host',
dest='host',
help='ip/hostname to connect to',
default=config[config_section]['host'])
parser.add_argument(
'-p',
'--port',
dest='port',
type=int,
help='port to connect to',
default=config[config_section]['port'])
parser.add_argument(
'-v',
'--vardir',
dest='vardir',
help='directory for storing rrd file',
default=config[config_section]['var_dir'])
parser.add_argument(
'-w',
'--pwdfile',
dest='passwd_file',
help='password file',
default=config[config_section]['passwd_file'])
parser.add_argument(
'-h',
'--help',
help='show this help and exit',
action='store_true',
default=False)
args = parser.parse_args()
logging.debug(args)
if args.help:
parser.print_help()
sys.exit()
#Make sure the file specified is to be found..
if not os.path.isfile(args.passwd_file):
logging.error('Cannot find passwd-file %s', args.passwd_file)
sys.exit()
if not os.path.isdir(args.vardir):
logging.error('Cannot find var dir %s', args.vardir)
rrdupdater = UpdateRRD(args.vardir + '/routerstats.rrd')
client = RouterstatsClient(args.host, args.port, args.passwd_file)
loop(client, rrdupdater)
if __name__ == '__main__':
main()

636
usr/bin/routerstats_collector.py Executable file
View File

@ -0,0 +1,636 @@
#!/usr/bin/env python3
'''Collect all the things'''
import signal
import os
import io
import sys
from multiprocessing import Process, Queue
import socketserver
import queue
from datetime import datetime
import logging
import time
import pickle
import configparser
import argparse
from setproctitle import setproctitle
logging.basicConfig(
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
class TimeToQuit(Exception):
'''Used to pass Quit to subthreads'''
class ReloadLog(Exception):
'''Used to reload log file'''
def wait_for_file(filename: str):
'''Busy loop waiting for file to exist, return return from open when file is opened'''
loops = 0
logging.debug('Trying to open file %s', filename)
while True:
try:
input_file = open(filename, 'r', encoding='utf-8')
return input_file
except FileNotFoundError:
logging.debug('%s not found, sleeping and retrying', filename)
loops += 1
loops = min(loops, 10)
time.sleep(loops)
return None
def fetch_line_from_file(filehandle):
'''Loop until we have formed a complete line from filehandle'''
line = ''
while True:
tmp = filehandle.readline()
if tmp is not None and tmp != '':
line += tmp
if line.endswith("\n"):
line.rstrip()
if line.isspace():
logging.debug('Empty line is empty, thank you for the newline')
else:
return line
line = ''
else:
logging.debug('readline reported line with no \n?')
else:
#Either readline returned None, or ''
return None
def filefetcher(
filename: str,
output_directory: str,
collector_queue: Queue,
signal_queue: Queue,
sleep_sec=0.5,
seek_pos=None):
'''Latch onto a file, putting any new lines onto the queue.'''
setproctitle('routerstats-collector file-fetcher')
if float(sleep_sec) <= 0.0:
logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)')
return False
while True:
try:
input_file = wait_for_file(filename)
if not input_file:
#Something is very wrong, the above function should only return
#whenever it has opened a file
logging.critical('Failed to open %s', filename)
break
start_stat = os.stat(filename)
if seek_pos is None:
cur_pos = input_file.seek(0, 0)
else:
cur_pos = input_file.seek(seek_pos, io.SEEK_SET)
logging.info(
'Following %s (inode %s) from pos %s', filename, start_stat.st_ino, cur_pos)
while True:
line = fetch_line_from_file(input_file)
if line:
logging.debug('Parsing line ending at pos %s', input_file.tell())
parse_and_queue_line(line, collector_queue)
start_stat = os.stat(filename)
cur_pos = input_file.tell()
else:
#Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:)
any_signal = got_signal(signal_queue, sleep_sec)
if any_signal == 'Quit':
shutdown_filefetcher(collector_queue, input_file, output_directory)
logging.critical('Shutdown filefetcher')
return True
now_stat = os.stat(filename)
if now_stat.st_ino != start_stat.st_ino:
#Inode has changed
#implicit break at the bottom
#the following if definitions are just to see if we want to reset seek_pos or not
if now_stat.st_ctime == start_stat.st_ctime:
#Strange, inode has changed, but ctime (creation_time) is the same?
if now_stat.st_size >= start_stat.st_size:
#Size is greater than or equal to last round
logging.warning(
'New inode number, but same ctime?'
'Not sure how to handle this. Reopening, but keeping seek position...')
else:
logging.warning(
'New inode number, same ctime, but smaller? Much confuse, starting from beginning..')
seek_pos = 0
else:
logging.debug('File have new inode number, restarting read from start')
seek_pos = 0
break
if now_stat.st_size < start_stat.st_size:
logging.debug('File is now smaller than last read, restarting from start')
seek_pos = 0
break
except KeyboardInterrupt:
shutdown_filefetcher(collector_queue, input_file, output_directory)
logging.debug('KeyboardInterrupt, closing file and quitting')
return False
except FileNotFoundError:
logging.debug('File gone away')
#Save progress, and hope it returns some day:)
seek_pos = save_current_pos(input_file, output_directory)
def save_current_pos(input_file, output_directory):
'''As it say on the tin'''
cur_pos = input_file.tell()
input_file.close()
with open(output_directory + 'position', 'w', encoding='utf-8') as output_file:
logging.debug('Saving current position %s to %s', cur_pos, output_directory + 'position')
output_file.write(str(cur_pos))
return cur_pos
def shutdown_filefetcher(output_queue, input_file, output_directory):
'''Cleanly close filehandles, save log position and queue contents'''
save_current_pos(input_file, output_directory)
dump_queue(output_queue, output_directory + 'dump.pickle')
def got_signal(signal_queue: Queue, sleep_sec: float):
'''Read from signal_queue with a timeout of sleep_sec,
returns either a signal name (whatever text string the queue gave us), or None'''
try:
any_signal = signal_queue.get(timeout=sleep_sec)
logging.critical('Got %s', any_signal)
return any_signal
except queue.Empty:
return None
def parse_and_queue_line(line: str, collector_queue):
'''Parse and queue the result'''
to_ret = parse_line(line)
if to_ret:
collector_queue.put(to_ret)
def parse_line(input_line: str) -> dict:
'''Fetch fourth space-separated item as zone, prepend with unix timestamp from the 3 first items'''
#Input line: Month DayOfMonth HH:MM:SS hostname zone....
#The input line is in the local timezone
#Seems like python known about our timezone, so when converting to unix timestamp from random text
#it should handle changes from CET to CEST
try:
input_line = input_line.split("\n")[0] #No idea why it appends a newline if I don't do this?
space_split = input_line.split()
month = space_split[0]
dateint = space_split[1]
timestr = space_split[2]
zone = space_split[4]
now = datetime.now()
except IndexError:
return None
try:
logline_time = datetime.strptime(
str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr),
'%Y %b %d %H:%M:%S')
except ValueError:
logging.error('Could not parse line %s', input_line)
return None
#If this is in the future, this probably means the data is from last year
if logline_time > now:
#We're in the future. Prettu sure the future is not yet, we're in last year
#This might happen around first of january
logline_time = logline_time.replace(year=logline_time.year - 1)
timestamp = int(logline_time.timestamp())
#FIXME Make this configurable somehow
if zone in ['loc-net', 'router-net']:
zone = 'loc-net'
retval = (timestamp, zone)
logging.debug('Parsed line to %s', retval)
return retval
def dump_queue(queue_to_dump: Queue, dumpfile):
'''Write the contents of a queue to a list that we pickle to a file'''
#We use pickle, every entry in the queue is one entry in a list
if queue_to_dump.empty():
#Nothing to save, nothing to do
logging.debug('Empty queue')
return
out_list = []
while True:
try:
out_list.append(queue_to_dump.get_nowait())
except queue.Empty:
break
if out_list:
logging.debug('Saving %s entries to %s', len(out_list), dumpfile)
to_save = pickle.dumps(out_list)
with open(dumpfile, 'wb') as output_file:
bytes_written = output_file.write(to_save)
logging.debug('Saved %s entries, taking %s bytes', len(out_list), bytes_written)
def signal_handler(signum, _):
'''Handle signals in a sensible way, I guess?'''
if signum == signal.SIGTERM:
logging.critical('Asked to quit')
raise TimeToQuit('Received signal ' + signal.Signals(signum).name)
def load_pickled_file(output_queue, loadfile):
'''Load queue contents from pickled queue structure'''
#Does our dump file exist?
if os.path.isfile(loadfile):
size = os.stat(loadfile).st_size
logging.debug('%s exists, loading %s bytes.', loadfile, size)
#This is already parsed lines, dump them straight into the file_parser_result_queue
#Saved format is [(timestamp, parseresult), ..]
with open(loadfile, 'rb') as input_file:
#Yes, I know, loads is unsafe:)
loaded_data = pickle.loads(input_file.read())
for entry in loaded_data:
output_queue.put(entry)
logging.debug('Put %s entries on the queue', len(loaded_data))
logging.debug('Deleting old dump')
os.unlink(loadfile)
def load_start_pos(logfile, position_file):
'''Read start position from file, if it exists'''
#Do we have any position we want to start from?
if os.path.isfile(position_file):
with open(position_file, 'r', encoding='utf-8') as input_file:
tmp_start_pos = input_file.readline()
try:
tmp_start_pos = int(tmp_start_pos)
except ValueError:
logging.error('Could not parse %s as an integer', tmp_start_pos)
return None
logging.debug('Loaded position %s', tmp_start_pos)
size = os.stat(logfile).st_size
logging.debug('log file size is %s', size)
if tmp_start_pos <= size:
return tmp_start_pos
return None
class RequestHandler(socketserver.BaseRequestHandler):
'''derived BaseRequestHandler'''
def set_passwd_file(self, filename):
'''Make us able to set attributes on derived classes'''
self.passwd_file = filename #pylint: disable=attribute-defined-outside-init
def check_login(self, answer):
'''Check if what client say is the password matches against passwd_file contents'''
with open(self.passwd_file, 'r', encoding='utf-8') as passwd_file:
passwd = passwd_file.readline()
passwd = passwd.rstrip() #Remove that newline
try:
answer = answer.decode('utf-8')
except UnicodeDecodeError as error:
logging.error('Could not decode %s as unicode: %s', answer, str(error))
if answer == passwd:
return True
return False
def login(self):
'''Run login procedure'''
try:
self.request.send(b'Hello')
try:
answer = self.request.recv(1024)
except TimeoutError:
#Client did not even bother to reply...
logging.warning('Timed out during auth')
self.request.send(b'timeout')
return None
if not self.check_login(answer):
logging.warning('Wrong passphrase')
self.request.send(b'auth error')
return None
self.request.send(b'Welcome')
logging.info('Client %s logged in', self.client_address[0])
return True
except BrokenPipeError:
#Client gone and came back, bad idea.
logging.warning('Broken pipe, closing socket')
return False
except ConnectionResetError:
#Other end closed socket, we're ok
logging.warning('Connection reset by peer')
return False
return None
def handle(self):
logging.info('Connected to %s', self.client_address[0])
self.sendt_lines = 0
self.request.settimeout(5)
start_time = datetime.now()
if not self.login():
self.request.close()
return
while True:
try:
if self.overflowqueue.empty is not True:
while True:
tosend = self.overflowqueue.get_nowait()
if tosend:
logging.debug('Sending %s from overflowqueue', tosend)
self.send(tosend)
except queue.Empty:
pass
try:
if self.input_queue.empty is not True:
event = self.input_queue.get(timeout=1)
tosend = str(event[0]) + ' ' + event[1]
try:
self.send(tosend)
except (BrokenPipeError, ConnectionResetError, TimeoutError) as error:
logging.error('Client gone: %s', error)
self.overflowqueue.put(tosend)
break
try:
_ = self.request.getpeername()
except OSError:
logging.error('Client gone')
self.overflowqueue.put(tosend)
break
start_time = datetime.now()
except queue.Empty:
pass
try:
rcvd_signal = self.signal_queue.get_nowait()
logging.debug('Signal: %s', rcvd_signal)
if rcvd_signal == 'Quit':
logging.info('Asked to quit')
break
except queue.Empty:
pass
now_time = datetime.now()
diff_time = now_time - start_time
if diff_time.total_seconds() >= 30:
start_time = now_time
#Long time, no see, time to pingpong the client:)
if self.ping_client() is not True:
break
logging.info('Request abandoned')
def send(self, tosend):
'''Wrap sendall'''
logging.debug('Sending %s', tosend)
self.request.sendall(bytes(tosend + "\n", 'utf-8'))
self.sendt_lines += 1
if not self.sendt_lines % 1000:
logging.info('Sendt %s lines', self.sendt_lines)
def set_queue(self, input_queue, overflowqueue, signal_queue):
'''Set Queue for fetching events'''
self.input_queue = input_queue #pylint: disable=attribute-defined-outside-init
self.overflowqueue = overflowqueue #pylint: disable=attribute-defined-outside-init
self.signal_queue = signal_queue #pylint: disable=attribute-defined-outside-init
def ping_client(self):
'''Send ping to a client, expect pong back, else close socket'''
try:
self.send('ping')
except BrokenPipeError:
#Someone closed our socket
logging.debug('Broken pipe')
return False
try:
response = self.request.recv(1024)
if response:
if response.strip() == b'pong':
logging.debug('Client said pong, all good')
return True
else:
logging.debug('No reply')
except TimeoutError:
logging.debug('Timeout')
except ConnectionResetError:
logging.debug('Connection reset, closing socket')
self.request.close()
except OSError as error:
if str(error) == 'timed out':
#Client probably just slacks
logging.debug('Timeout')
else:
logging.error('Peer gone?: %s', error)
return False
def socket_server(
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue,
passwd_file,
server_port):
'''Socket server sending whatever data is in the queue to any client connecting'''
#Multiple connections here is probably a horrible idea:)
setproctitle('routerstats-collector socket_server')
host, port = '', server_port
while True:
try:
socketserver.TCPServer.allow_reuse_address = True
socketserver.TCPServer.allow_reuse_port = True
server = socketserver.TCPServer((host, port), RequestHandler)
server.timeout = 1
with server:
server.RequestHandlerClass.set_queue(
server.RequestHandlerClass,
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue)
server.RequestHandlerClass.set_passwd_file(server.RequestHandlerClass, passwd_file)
logging.info('Socket up at %s:%s', host, port)
while True:
try:
server.handle_request()
except KeyboardInterrupt:
logging.debug('Received KeyboardInterrupt')
server.server_close()
return
except ValueError:
#This seems to happen whenever the socket is closed somewhere else,
#but handle_request still runs
server.server_close()
break
try:
recvd_signal = socket_server_signal_queue.get_nowait()
if recvd_signal == 'Quit':
logging.warning('Shutting down socketserver')
server.server_close()
return
except queue.Empty:
pass
except OSError as error:
logging.info('Waiting for Address to become available: %s', error)
time.sleep(1)
continue
def main():
'''Main thingy'''
config_section = 'collector'
setproctitle('routerstats-collector main-thread')
config = configparser.ConfigParser()
parser = argparse.ArgumentParser(exit_on_error=False, add_help=False)
parser.add_argument('-c', '--config', help='config file to load')
parser.add_argument('-d', '--debug', action='store_true', help='enable debug')
args, _ = parser.parse_known_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logging.debug('Starting as PID %s' ,os.getpid())
found = False
config_dirs = ('/etc/routerstats/', '/usr/local/etc/routerstats/', '/opt/routerstats/', './')
if args.config:
if os.path.isfile(args.config):
config.read(args.config)
found = True
else:
logging.error('Specified config file does not exist: %s', args.config)
else:
logging.debug('Trying to find config')
#Try to find in "usual" places
for directory in config_dirs:
trytoread = directory + 'routerstats.config'
if os.path.isfile(trytoread):
logging.debug('Reading config file %s', trytoread)
config.read(trytoread)
found = True
if not found:
logging.error('routerstats.config not found in %s', config_dirs)
sys.exit(0)
parser.add_argument(
'-f',
'--file',
dest='file_to_follow',
help='Log file to follow',
default=config[config_section]['logfile'])
parser.add_argument(
'-w',
'--pwdfile',
dest='passwd_file',
help='password file',
default=config[config_section]['passwd_file'])
parser.add_argument(
'-p',
'--port',
dest='server_port',
type=int,
help='tcp port to listen to',
default=config[config_section]['port'])
parser.add_argument(
'-v',
'--vardir',
dest='var_dir',
help='Location for queue dumps',
default=config[config_section]['var_dir'])
parser.add_argument(
'-h',
'--help',
help='show this help and exit',
action='store_true',
default=False
)
args = parser.parse_args()
logging.debug(args)
if args.help:
parser.print_help()
sys.exit()
#Just quit early if file is missing..
if os.path.isfile(args.file_to_follow) is False:
logging.error('Could not find log file %s', args.file_to_follow)
sys.exit()
if not os.path.isfile(args.passwd_file):
logging.error('Could not find password file %s', args.passwd_file)
sys.exit()
if not args.server_port:
logging.error('No TPC port to bind to')
sys.exit()
if not os.path.isdir(args.var_dir):
logging.error('Could not find var dir %s', args.var_dir)
sys.exit()
file_parser_result_queue = Queue()
file_parser_signal_queue = Queue()
overflowqueue = Queue()
socket_server_signal_queue = Queue()
started_processes = []
load_pickled_file(file_parser_result_queue, args.var_dir + '/dump.pickle')
start_pos = load_start_pos(args.file_to_follow, args.var_dir + '/position')
file_parser_process = Process(
target=filefetcher,
daemon=True,
args=(
args.file_to_follow,
args.var_dir,
file_parser_result_queue,
file_parser_signal_queue,
0.5,
start_pos))
file_parser_process.start()
logging.debug('Started filefetcher as pid %s', file_parser_process.pid)
started_processes.append((file_parser_process, file_parser_signal_queue))
#We're not writing directly to an rrd,
#This goes out to a socket
#As soon as a client connects to the socket, we send what we have in queue
#Client can do whatever it wants with this
#This means any "malicious" connections will wipe the history
#We're fine with this
socket_server_process = Process(
target=socket_server,
daemon=True,
args=(
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue,
args.passwd_file,
args.server_port))
socket_server_process.start()
logging.debug('Socket server started as pid %s', socket_server_process.pid)
started_processes.append((socket_server_process, socket_server_signal_queue))
signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:)
signal.signal(signal.SIGINT, signal_handler)
#No idea how to manage this better?
while True:
try:
for p in started_processes:
if p[0].is_alive():
pass
else:
logging.error('%s has died prematurely?', p[0].name)
p[0].join()
raise TimeToQuit
time.sleep(0.5)
except (KeyboardInterrupt, TimeToQuit):
logging.warning('Asked to quit via exception TimeToQuit')
for p in started_processes:
if p[0].is_alive():
p[1].put('Quit')
p[0].join(timeout=5)
if p[0].is_alive():
logging.error('Timeout waiting for shutdown, killing child PID: %s', p[0].pid)
p[0].kill()
break
if __name__ == '__main__':
main()

193
usr/bin/routerstats_httpd.py Executable file
View File

@ -0,0 +1,193 @@
#!/usr/bin/env python3
'''httpd component of routerstats'''
import logging
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
import sys
import os
from functools import partial
import argparse
import configparser
import rrdtool
logging.basicConfig(
format='%(asctime)s %(funcName)20s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
class RequestHandler(SimpleHTTPRequestHandler):
'''Extended SimpleHTTPRequestHandler'''
def serve_png(self):
'''Give a client a png file'''
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
tmp_queries = self.path.split('/')
query = []
for q in tmp_queries:
if q:
query.append(q)
logging.debug(str(query))
retries = 0
while True:
try:
if len(query) > 1:
if query[1] == 'yearly':
title = query[1]
startstr = '--start=end-1Y'
elif query[1] == 'monthly':
title = query[1]
startstr = '--start=end-1M'
elif query[1] == 'weekly':
title = query[1]
startstr = '--start=end-1W'
elif query[1] == 'daily':
title = query[1]
startstr = '--start=end-1d'
else:
title = 'monthly'
startstr = '--start=end-1M'
else:
title = 'monthly'
startstr = '--start=end-1M'
data = rrdtool.graphv(
"-",
startstr,
"--title=" + title,
"DEF:in=" + self.rrdfile + ":net_dnat:AVERAGE", #pylint: disable=no-member
"DEF:out=" + self.rrdfile + ":loc-net:AVERAGE", #pylint: disable=no-member
"CDEF:result_in=in,UN,0,in,IF",
"CDEF:tmp_out=out,UN,0,out,IF",
"CDEF:result_out=tmp_out,-1,*",
"AREA:result_in#00ff00:in",
"AREA:result_out#0000ff:out")
#, "--width", "1024", "--height", "600"
self.wfile.write(data['image'])
break
except rrdtool.OperationalError as error:
retries += 1
if retries > 10:
logging.error('Could not graphv the data: %s', error)
break
def errorpage(self, errorcode, errormsg: str = ''):
'''Give client an errorpage'''
self.send_response(errorcode)
self.send_header('Content-type', 'text/plain')
self.end_headers()
if errormsg:
self.wfile.write(errormsg.encode('utf-8'))
def do_GET(self):
'''Reply to a GET request'''
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.print_page()
elif self.path.startswith('/graph'):
self.serve_png()
elif self.path == '/favicon.ico':
self.send_error(404)
else:
self.send_error(404, str(self.path))
def print_page(self):
'''Print pretty html page'''
self.wfile.write("<html><body>".encode('utf-8'))
self.wfile.write('<img src="/graph/monthly">'.encode('utf-8'))
self.wfile.write('<img src="/graph/weekly">'.encode('utf-8'))
self.wfile.write('<img src="/graph/daily">'.encode('utf-8'))
self.wfile.write("</body></html>".encode('utf-8'))
class CustomHandler(RequestHandler):
'''wrapping the wrapped one, so we can wrap in an init'''
def __init__(self, rrdfile, *args, **kwargs):
self.rrdfile = rrdfile
super().__init__(*args, **kwargs)
def server(rrdfile, port):
'''Start server'''
server_address = ('', port)
server_class = ThreadingHTTPServer
handler_class = partial(CustomHandler, rrdfile)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()
def main():
'''This be main'''
config_section = 'httpd'
config = configparser.ConfigParser()
parser = argparse.ArgumentParser(exit_on_error=False, add_help=False)
parser.add_argument('-c', '--config', help='config file to load')
parser.add_argument('-d', '--debug', action='store_true', help='enable debug')
args, _ = parser.parse_known_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logging.debug('Starting as PID %s', os.getpid())
found = False
config_dirs = ('/etc/routerstats/', '/usr/local/etc/routerstats/', '/opt/routerstats/', './')
if args.config:
if os.path.isfile(args.config):
config.read(args.config)
found = True
else:
logging.error('Specified config file does not exist: %s', args.config)
else:
logging.debug('Trying to find config')
#Try to find in "usual" places
for directory in config_dirs:
trytoread = directory + 'routerstats.config'
if os.path.isfile(trytoread):
logging.debug('Reading config file %s', trytoread)
config.read(trytoread)
found = True
if not found:
logging.error('routerstats.config not found in %s', config_dirs)
sys.exit(0)
parser.add_argument(
'-v',
'--vardir',
help='directory storing rrd file',
default=config[config_section]['var_dir'])
parser.add_argument(
'-p',
'--port',
help='port to bind to',
type=int,
default=config[config_section]['port'])
parser.add_argument(
'-h',
'--help',
help='show this help and exit',
action='store_true',
default=False)
args = parser.parse_args()
logging.debug(args)
if args.help:
parser.print_help()
sys.exit()
if not os.path.isfile(args.vardir + 'routerstats.rrd'):
logging.error('Cannot find rrd file %s', args.vardir + 'routerstats.rrd' )
sys.exit()
server(rrdfile=args.vardir + 'routerstats.rrd', port=args.port)
if __name__ == '__main__':
logging.debug('Welcome')
main()