Amend most pylint complaints

This commit is contained in:
2024-07-03 19:27:24 +02:00
parent dcbdf4feca
commit c72fa2f5d4

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python3
'''Collect all the things'''
import signal
import os
@ -23,13 +25,17 @@ logging.basicConfig(
class TimeToQuit(Exception):
'''Used to pass Quit to subthreads'''
pass
class ReloadLog(Exception):
'''Used to reload log file'''
pass
def filefetcher(filename: str, output_directory: str, collector_queue, signal_queue, sleep_sec=0.5, seek_pos=None):
def filefetcher(
filename: str,
output_directory: str,
collector_queue: Queue,
signal_queue: Queue,
sleep_sec=0.5,
seek_pos=None):
'''Latch onto a file, putting any new lines onto the queue.'''
setproctitle('routerstats-collector file-fetcher')
if float(sleep_sec) <= 0.0:
@ -37,13 +43,12 @@ def filefetcher(filename: str, output_directory: str, collector_queue, signal_qu
return False
while True:
try:
input_file = open(filename, 'r')
input_file = open(filename, 'r', encoding='utf-8')
except FileNotFoundError:
logging.debug('Retry opening ' + filename)
logging.debug('Retry opening %s', filename)
time.sleep(1)
continue
retries = 0
line = ''
start_stat = os.stat(filename)
if seek_pos is None:
@ -51,7 +56,8 @@ def filefetcher(filename: str, output_directory: str, collector_queue, signal_qu
else:
cur_pos = input_file.seek(seek_pos, io.SEEK_SET)
logging.info('Following ' + filename + ' (inode ' + str(start_stat.st_ino) + ') from pos ' + str(cur_pos))
logging.info(
'Following %s (inode %s) from pos %s', filename, start_stat.st_ino, cur_pos)
try:
while True:
@ -63,7 +69,7 @@ def filefetcher(filename: str, output_directory: str, collector_queue, signal_qu
if line.isspace():
logging.debug('Empty line is empty, thank you for the newline')
else:
logging.debug('Parsing line ending at pos ' + str(input_file.tell()))
logging.debug('Parsing line ending at pos %s', input_file.tell())
parse_and_queue_line(line, collector_queue)
line = ''
start_stat = os.stat(filename)
@ -82,9 +88,12 @@ def filefetcher(filename: str, output_directory: str, collector_queue, signal_qu
if now_stat.st_ctime == start_stat.st_ctime:
#Strange, inode has changed, but ctime is the same?
if now_stat.st_size >= start_stat.st_size:
logging.warning('New inode number, but same ctime? Not sure how to handle this. Reopening, but keeping seek position...')
logging.warning(
'New inode number, but same ctime?'
'Not sure how to handle this. Reopening, but keeping seek position...')
else:
logging.warning('New inode number, same ctime, but smaller? Much confuse, starting from beginning..')
logging.warning(
'New inode number, same ctime, but smaller? Much confuse, starting from beginning..')
seek_pos = 0
else:
logging.debug('File have new inode number, restarting read from start')
@ -99,16 +108,14 @@ def filefetcher(filename: str, output_directory: str, collector_queue, signal_qu
logging.debug('KeyboardInterrupt, closing file and quitting')
return False
except FileNotFoundError:
'''Input file gone-gone during loop, retry opening a few times'''
logging.debug('File gone away')
next
def shutdown_filefetcher(output_queue, input_file, output_directory):
'''Cleanly close filehandles, save log position and queue contents'''
cur_pos = input_file.tell()
input_file.close()
with open(output_directory + 'position', 'w') as output_file:
logging.debug('Saving current position ' + str(cur_pos))
with open(output_directory + 'position', 'w', encoding='utf-8') as output_file:
logging.debug('Saving current position %s', cur_pos)
output_file.write(str(cur_pos))
dump_queue(output_queue, output_directory + 'dump.pickle')
@ -117,7 +124,7 @@ def got_signal(signal_queue: Queue, sleep_sec: float):
returns either a signal name (whatever text string the queue gave us), or None'''
try:
any_signal = signal_queue.get(timeout=sleep_sec)
logging.critical('Got ' + any_signal)
logging.critical('Got %s', any_signal)
return any_signal
except queue.Empty:
return None
@ -145,9 +152,11 @@ def parse_line(input_line: str) -> dict:
except IndexError:
return None
try:
logline_time = datetime.strptime(str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr), '%Y %b %d %H:%M:%S')
logline_time = datetime.strptime(
str(now.year) + ' ' + month + ' ' + str(dateint) + ' ' + str(timestr),
'%Y %b %d %H:%M:%S')
except ValueError:
logging.error('Could not parse line ' + str(input_line))
logging.error('Could not parse line %s', input_line)
return None
#If this is in the future, this probably means the data is from last year
if logline_time > now:
@ -159,7 +168,7 @@ def parse_line(input_line: str) -> dict:
if zone in ['loc-net', 'router-net']:
zone = 'loc-net'
retval = (timestamp, zone)
logging.debug('Parsed line to ' + str(retval))
logging.debug('Parsed line to %s', retval)
return retval
def dump_queue(queue_to_dump: Queue, dumpfile):
@ -176,13 +185,13 @@ def dump_queue(queue_to_dump: Queue, dumpfile):
except queue.Empty:
break
if out_list:
logging.debug('Saving ' + str(len(out_list)) + ' entries to ' + str(dumpfile))
logging.debug('Saving %s entries to %s', len(out_list), dumpfile)
to_save = pickle.dumps(out_list)
with open(dumpfile, 'wb') as output_file:
bytes_written = output_file.write(to_save)
logging.debug('Saved ' + str(len(out_list)) + ' entries, taking ' + str(bytes_written) + ' bytes')
logging.debug('Saved %s entries, taking %s bytes', len(out_list), bytes_written)
def signal_handler(signum, frame):
def signal_handler(signum, _):
'''Handle signals in a sensible way, I guess?'''
if signum == signal.SIGTERM:
logging.critical('Asked to quit')
@ -193,7 +202,7 @@ def load_pickled_file(output_queue, loadfile):
#Does our dump file exist?
if os.path.isfile(loadfile):
size = os.stat(loadfile).st_size
logging.debug(loadfile + ' exists, loading ' + str(size) + ' bytes.')
logging.debug('%s exists, loading %s bytes.', loadfile, size)
#This is already parsed lines, dump them straight into the file_parser_result_queue
#Saved format is [(timestamp, parseresult), ..]
with open(loadfile, 'rb') as input_file:
@ -201,7 +210,7 @@ def load_pickled_file(output_queue, loadfile):
loaded_data = pickle.loads(input_file.read())
for entry in loaded_data:
output_queue.put(entry)
logging.debug('Put ' + str(len(loaded_data)) + ' entries on the queue')
logging.debug('Put %s entries on the queue', len(loaded_data))
logging.debug('Deleting old dump')
os.unlink(loadfile)
@ -209,16 +218,16 @@ def load_start_pos(logfile, position_file):
'''Read start position from file, if it exists'''
#Do we have any position we want to start from?
if os.path.isfile(position_file):
with open(position_file, 'r') as input_file:
with open(position_file, 'r', encoding='utf-8') as input_file:
tmp_start_pos = input_file.readline()
try:
tmp_start_pos = int(tmp_start_pos)
except ValueError:
logging.error('Could not parse ' + str(tmp_start_pos) + ' as an integer')
logging.error('Could not parse %s as an integer', tmp_start_pos)
return None
logging.debug('Loaded position ' + str(tmp_start_pos))
logging.debug('Loaded position %s', tmp_start_pos)
size = os.stat(logfile).st_size
logging.debug('log file size is ' + str(size))
logging.debug('log file size is %s', size)
if tmp_start_pos <= size:
return tmp_start_pos
return None
@ -226,10 +235,12 @@ def load_start_pos(logfile, position_file):
class RequestHandler(socketserver.BaseRequestHandler):
'''derived BaseRequestHandler'''
def set_passwd_file(self, filename):
self.passwd_file = filename
'''Make us able to set attributes on derived classes'''
self.passwd_file = filename #pylint: disable=attribute-defined-outside-init
def check_login(self, answer):
with open(self.passwd_file, 'r') as passwd_file:
'''Check if what client say is the password matches against passwd_file contents'''
with open(self.passwd_file, 'r', encoding='utf-8') as passwd_file:
passwd = passwd_file.readline()
passwd = passwd.rstrip() #Remove that newline
try:
@ -241,6 +252,7 @@ class RequestHandler(socketserver.BaseRequestHandler):
return False
def login(self):
'''Run login procedure'''
try:
self.request.send(b'Hello')
try:
@ -249,13 +261,13 @@ class RequestHandler(socketserver.BaseRequestHandler):
#Client did not even bother to reply...
logging.warning('Timed out during auth')
self.request.send(b'timeout')
return
return None
if not self.check_login(answer):
logging.warning('Wrong passphrase')
self.request.send(b'auth error')
return
return None
self.request.send(b'Welcome')
logging.info('Client ' + str(self.client_address[0]) + ' logged in')
logging.info('Client %s logged in', self.client_address[0])
return True
except BrokenPipeError:
#Client gone and came back, bad idea.
@ -265,9 +277,10 @@ class RequestHandler(socketserver.BaseRequestHandler):
#Other end closed socket, we're ok
logging.warning('Connection reset by peer')
return False
return None
def handle(self):
logging.info('Connected to ' + str(self.client_address[0]))
logging.info('Connected to %s', self.client_address[0])
self.request.settimeout(5)
start_time = datetime.now()
if not self.login():
@ -275,27 +288,27 @@ class RequestHandler(socketserver.BaseRequestHandler):
return
while True:
try:
if self.overflowqueue.empty != True:
if self.overflowqueue.empty is not True:
while True:
tosend = self.overflowqueue.get_nowait()
if tosend:
logging.debug('Sending ' + str(tosend) + ' from overflowqueue')
logging.debug('Sending %s from overflowqueue', tosend)
self.send(tosend)
except queue.Empty:
pass
try:
if self.input_queue.empty != True:
if self.input_queue.empty is not True:
event = self.input_queue.get(timeout=1)
tosend = str(event[0]) + ' ' + event[1]
try:
self.send(tosend)
except (BrokenPipeError, ConnectionResetError, TimeoutError) as error:
logging.error('Client gone: ' + str(error))
logging.error('Client gone: %s', error)
self.overflowqueue.put(tosend)
break
try:
peer = self.request.getpeername()
except OSError as error:
_ = self.request.getpeername()
except OSError:
logging.error('Client gone')
self.overflowqueue.put(tosend)
break
@ -304,9 +317,9 @@ class RequestHandler(socketserver.BaseRequestHandler):
pass
try:
signal = self.signal_queue.get_nowait()
logging.debug('Signal: ' + str(signal))
if signal == 'Quit':
rcvd_signal = self.signal_queue.get_nowait()
logging.debug('Signal: %s', rcvd_signal)
if rcvd_signal == 'Quit':
logging.info('Asked to quit')
break
except queue.Empty:
@ -317,20 +330,20 @@ class RequestHandler(socketserver.BaseRequestHandler):
if diff_time.total_seconds() >= 30:
start_time = now_time
#Long time, no see, time to pingpong the client:)
if self.ping_client() != True:
if self.ping_client() is not True:
break
logging.info('Request abandoned')
def send(self, tosend):
'''Wrap sendall'''
logging.debug('Sending ' + str(tosend))
logging.debug('Sending %s', tosend)
self.request.sendall(bytes(tosend + "\n", 'utf-8'))
def set_queue(self, input_queue, overflowqueue, signal_queue):
'''Set Queue for fetching events'''
self.input_queue = input_queue
self.overflowqueue = overflowqueue
self.signal_queue = signal_queue
self.input_queue = input_queue #pylint: disable=attribute-defined-outside-init
self.overflowqueue = overflowqueue #pylint: disable=attribute-defined-outside-init
self.signal_queue = signal_queue #pylint: disable=attribute-defined-outside-init
def ping_client(self):
'''Send ping to a client, expect pong back, else close socket'''
@ -349,24 +362,26 @@ class RequestHandler(socketserver.BaseRequestHandler):
return True
else:
logging.debug('No reply')
return False
except TimeoutError:
logging.debug('Timeout')
return False
except ConnectionResetError:
logging.debug('Connection reset, closing socket')
self.request.close()
return False
except OSError as error:
if str(error) == 'timed out':
#Client probably just slacks
logging.debug('Timeout')
else:
logging.error('Peer gone?: ' + str(error))
return False
logging.error('Peer gone?: %s', error)
return False
def socket_server(file_parser_result_queue, overflowqueue, socket_server_signal_queue, passwd_file, server_port):
def socket_server(
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue,
passwd_file,
server_port):
'''Socket server sending whatever data is in the queue to any client connecting'''
#Multiple connections here is probably a horrible idea:)
setproctitle('routerstats-collector socket_server')
@ -378,26 +393,25 @@ def socket_server(file_parser_result_queue, overflowqueue, socket_server_signal_
server = socketserver.TCPServer((host, port), RequestHandler)
server.timeout = 1
with server:
server.RequestHandlerClass.set_queue(server.RequestHandlerClass, file_parser_result_queue, overflowqueue, socket_server_signal_queue)
server.RequestHandlerClass.set_queue(
server.RequestHandlerClass,
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue)
server.RequestHandlerClass.set_passwd_file(server.RequestHandlerClass, passwd_file)
logging.info('Socket up at ' + host + ':' + str(port))
logging.info('Socket up at %s:%s', host, port)
while True:
try:
logging.debug('Waiting for request')
server.handle_request()
except KeyboardInterrupt:
logging.debug('Received KeyboardInterrupt')
try:
server.server_close()
except Exception as e:
logging.exception(e)
server.server_close()
return
except ValueError:
#This seems to happen whenever the socket is closed somewhere else, but handle_request still runs
try:
server.server_close()
except Exception as e:
logging.exception(e)
#This seems to happen whenever the socket is closed somewhere else,
#but handle_request still runs
server.server_close()
break
try:
recvd_signal = socket_server_signal_queue.get_nowait()
@ -408,7 +422,7 @@ def socket_server(file_parser_result_queue, overflowqueue, socket_server_signal_
except queue.Empty:
pass
except OSError as error:
logging.info('Waiting for Address to become available: ' + str(error))
logging.info('Waiting for Address to become available: %s', error)
time.sleep(1)
continue
@ -417,24 +431,18 @@ def main():
config_section = 'collector'
setproctitle('routerstats-collector main-thread')
passwd_file = None
file_to_follow = None
server_port = None
var_dir = None
config = configparser.ConfigParser()
#parser = argparse.ArgumentParser(exit_on_error=False, prog='routerstats_collector', description='Collecting information from logfile and sending to routerstats_client')
parser = argparse.ArgumentParser(exit_on_error=False)
parser.add_argument('-c', '--config', help='config file to load')
parser.add_argument('-d', '--debug', action='store_true', help='enable debug')
args, remaining_args = parser.parse_known_args()
args, _ = parser.parse_known_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logging.debug('Starting as PID ' + str(os.getpid()))
logging.debug('Starting as PID %s' ,os.getpid())
found = False
if args.config:
@ -456,17 +464,38 @@ def main():
logging.error('No config file found')
sys.exit(0)
parser.add_argument('-f', '--file', dest='file_to_follow', help='Log file to follow', default=config[config_section]['logfile'])
parser.add_argument('-w', '--pwdfile', dest='passwd_file', help='password file', default=config[config_section]['passwd_file'])
parser.add_argument('-p', '--port', dest='server_port', type=int, help='tcp port to listen to', default=config[config_section]['port'])
parser.add_argument('-v', '--vardir', dest='var_dir', help='Location for queue dumps', default=config[config_section]['var_dir'])
parser.add_argument(
'-f',
'--file',
dest='file_to_follow',
help='Log file to follow',
default=config[config_section]['logfile'])
parser.add_argument(
'-w',
'--pwdfile',
dest='passwd_file',
help='password file',
default=config[config_section]['passwd_file'])
parser.add_argument(
'-p',
'--port',
dest='server_port',
type=int,
help='tcp port to listen to',
default=config[config_section]['port'])
parser.add_argument(
'-v',
'--vardir',
dest='var_dir',
help='Location for queue dumps',
default=config[config_section]['var_dir'])
args = parser.parse_args()
logging.debug(args)
#Just quit early if file is missing..
if os.path.isfile(args.file_to_follow) is False:
logging.error('Could not find log file ' + args.file_to_follow)
logging.error('Could not find log file %s', args.file_to_follow)
sys.exit()
if not os.path.isfile(args.passwd_file):
@ -492,9 +521,18 @@ def main():
load_pickled_file(file_parser_result_queue, args.var_dir + '/dump.pickle')
start_pos = load_start_pos(args.file_to_follow, args.var_dir + '/position')
file_parser_process = Process(target=filefetcher, daemon=True, args=(args.file_to_follow, args.var_dir, file_parser_result_queue, file_parser_signal_queue, 0.5, start_pos))
file_parser_process = Process(
target=filefetcher,
daemon=True,
args=(
args.file_to_follow,
args.var_dir,
file_parser_result_queue,
file_parser_signal_queue,
0.5,
start_pos))
file_parser_process.start()
logging.debug('Started filefetcher as pid ' + str(file_parser_process.pid))
logging.debug('Started filefetcher as pid %s', file_parser_process.pid)
started_processes.append((file_parser_process, file_parser_signal_queue))
#We're not writing directly to an rrd,
@ -504,16 +542,19 @@ def main():
#This means any "malicious" connections will wipe the history
#We're fine with this
socket_server_process = Process(target=socket_server, daemon=True, args=(file_parser_result_queue, overflowqueue, socket_server_signal_queue, args.passwd_file, args.server_port))
socket_server_process = Process(
target=socket_server,
daemon=True,
args=(
file_parser_result_queue,
overflowqueue,
socket_server_signal_queue,
args.passwd_file,
args.server_port))
socket_server_process.start()
logging.debug('Socket server started as pid ' + str(socket_server_process.pid))
logging.debug('Socket server started as pid %s', socket_server_process.pid)
started_processes.append((socket_server_process, socket_server_signal_queue))
#rrd_stuffer_process = Process(target=rrd_stuffer, args(file_parser_result_queue, rrd_stuffer_signal_queue))
#rrd_stuffer_process.start()
#started_prcesses.append(rrd_stuffer_process)
#signal_queues.append(rrd_stuffer_signal_queue)
signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:)
#No idea how to manage this better?
@ -523,7 +564,7 @@ def main():
if p[0].is_alive():
pass
else:
logging.error(p[0].name + ' has died prematurely?')
logging.error('%s has died prematurely?', p[0].name)
p[0].join()
dead_processes += 1
if dead_processes >= len(started_processes):
@ -536,7 +577,7 @@ def main():
p[1].put('Quit')
p[0].join(timeout=5)
if p[0].is_alive():
logging.error('Timeout waiting for shutdown, killing child PID: ' + str(p[0].pid))
logging.error('Timeout waiting for shutdown, killing child PID: %s', p[0].pid)
p[0].kill()
break