Refactor out things from filefetcher, improve error messages
This commit is contained in:
@@ -29,6 +29,42 @@ class TimeToQuit(Exception):
|
|||||||
class ReloadLog(Exception):
|
class ReloadLog(Exception):
|
||||||
'''Used to reload log file'''
|
'''Used to reload log file'''
|
||||||
|
|
||||||
|
def wait_for_file(filename: str):
|
||||||
|
'''Busy loop waiting for file to exist, return return from open when file is opened'''
|
||||||
|
loops = 0
|
||||||
|
logging.debug('Trying to open file %s', filename)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
input_file = open(filename, 'r', encoding='utf-8')
|
||||||
|
return input_file
|
||||||
|
except FileNotFoundError:
|
||||||
|
logging.debug('%s not found, sleeping and retrying', filename)
|
||||||
|
loops += 1
|
||||||
|
if loops >= 10:
|
||||||
|
loops = 10
|
||||||
|
time.sleep(loops)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fetch_line_from_file(filehandle):
|
||||||
|
'''Loop until we have formed a complete line from filehandle'''
|
||||||
|
line = ''
|
||||||
|
while True:
|
||||||
|
tmp = filehandle.readline()
|
||||||
|
if tmp is not None and tmp != '':
|
||||||
|
line += tmp
|
||||||
|
if line.endswith("\n"):
|
||||||
|
line.rstrip()
|
||||||
|
if line.isspace():
|
||||||
|
logging.debug('Empty line is empty, thank you for the newline')
|
||||||
|
else:
|
||||||
|
return line
|
||||||
|
line = ''
|
||||||
|
else:
|
||||||
|
logging.debug('readline reported line with no \n?')
|
||||||
|
else:
|
||||||
|
#Either readline returned None, or ''
|
||||||
|
return None
|
||||||
|
|
||||||
def filefetcher(
|
def filefetcher(
|
||||||
filename: str,
|
filename: str,
|
||||||
output_directory: str,
|
output_directory: str,
|
||||||
@@ -44,13 +80,13 @@ def filefetcher(
|
|||||||
return False
|
return False
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
input_file = open(filename, 'r', encoding='utf-8')
|
input_file = wait_for_file(filename)
|
||||||
except FileNotFoundError:
|
if not input_file:
|
||||||
logging.debug('Retry opening %s', filename)
|
#Something is very wrong, the above function should only return
|
||||||
time.sleep(1)
|
#whenever it has opened a file
|
||||||
continue
|
logging.critical('Failed to open %s', filename)
|
||||||
|
break
|
||||||
|
|
||||||
line = ''
|
|
||||||
start_stat = os.stat(filename)
|
start_stat = os.stat(filename)
|
||||||
if seek_pos is None:
|
if seek_pos is None:
|
||||||
cur_pos = input_file.seek(0, 0)
|
cur_pos = input_file.seek(0, 0)
|
||||||
@@ -60,23 +96,13 @@ def filefetcher(
|
|||||||
logging.info(
|
logging.info(
|
||||||
'Following %s (inode %s) from pos %s', filename, start_stat.st_ino, cur_pos)
|
'Following %s (inode %s) from pos %s', filename, start_stat.st_ino, cur_pos)
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
while True:
|
||||||
tmp = input_file.readline()
|
line = fetch_line_from_file(input_file)
|
||||||
if tmp is not None and tmp != '':
|
if line:
|
||||||
line += tmp
|
|
||||||
if line.endswith("\n"):
|
|
||||||
line.rstrip()
|
|
||||||
if line.isspace():
|
|
||||||
logging.debug('Empty line is empty, thank you for the newline')
|
|
||||||
else:
|
|
||||||
logging.debug('Parsing line ending at pos %s', input_file.tell())
|
logging.debug('Parsing line ending at pos %s', input_file.tell())
|
||||||
parse_and_queue_line(line, collector_queue)
|
parse_and_queue_line(line, collector_queue)
|
||||||
line = ''
|
|
||||||
start_stat = os.stat(filename)
|
start_stat = os.stat(filename)
|
||||||
cur_pos = input_file.tell()
|
cur_pos = input_file.tell()
|
||||||
else:
|
|
||||||
logging.debug('readline reported line with no \n?')
|
|
||||||
else:
|
else:
|
||||||
#Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:)
|
#Using got_signal with a timeout of sleep_sec to rate limit the loopiness of this loop:)
|
||||||
any_signal = got_signal(signal_queue, sleep_sec)
|
any_signal = got_signal(signal_queue, sleep_sec)
|
||||||
@@ -86,9 +112,13 @@ def filefetcher(
|
|||||||
return True
|
return True
|
||||||
now_stat = os.stat(filename)
|
now_stat = os.stat(filename)
|
||||||
if now_stat.st_ino != start_stat.st_ino:
|
if now_stat.st_ino != start_stat.st_ino:
|
||||||
|
#Inode has changed
|
||||||
|
#implicit break at the bottom
|
||||||
|
#the following if definitions are just to see if we want to reset seek_pos or not
|
||||||
if now_stat.st_ctime == start_stat.st_ctime:
|
if now_stat.st_ctime == start_stat.st_ctime:
|
||||||
#Strange, inode has changed, but ctime is the same?
|
#Strange, inode has changed, but ctime (creation_time) is the same?
|
||||||
if now_stat.st_size >= start_stat.st_size:
|
if now_stat.st_size >= start_stat.st_size:
|
||||||
|
#Size is greater than or equal to last round
|
||||||
logging.warning(
|
logging.warning(
|
||||||
'New inode number, but same ctime?'
|
'New inode number, but same ctime?'
|
||||||
'Not sure how to handle this. Reopening, but keeping seek position...')
|
'Not sure how to handle this. Reopening, but keeping seek position...')
|
||||||
@@ -110,14 +140,21 @@ def filefetcher(
|
|||||||
return False
|
return False
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logging.debug('File gone away')
|
logging.debug('File gone away')
|
||||||
|
#Save progress, and hope it returns some day:)
|
||||||
|
seek_pos = save_current_pos(input_file, output_directory)
|
||||||
|
|
||||||
def shutdown_filefetcher(output_queue, input_file, output_directory):
|
def save_current_pos(input_file, output_directory):
|
||||||
'''Cleanly close filehandles, save log position and queue contents'''
|
'''As it say on the tin'''
|
||||||
cur_pos = input_file.tell()
|
cur_pos = input_file.tell()
|
||||||
input_file.close()
|
input_file.close()
|
||||||
with open(output_directory + 'position', 'w', encoding='utf-8') as output_file:
|
with open(output_directory + 'position', 'w', encoding='utf-8') as output_file:
|
||||||
logging.debug('Saving current position %s', cur_pos)
|
logging.debug('Saving current position %s to %s', cur_pos, output_directory + 'position')
|
||||||
output_file.write(str(cur_pos))
|
output_file.write(str(cur_pos))
|
||||||
|
return cur_pos
|
||||||
|
|
||||||
|
def shutdown_filefetcher(output_queue, input_file, output_directory):
|
||||||
|
'''Cleanly close filehandles, save log position and queue contents'''
|
||||||
|
save_current_pos(input_file, output_directory)
|
||||||
dump_queue(output_queue, output_directory + 'dump.pickle')
|
dump_queue(output_queue, output_directory + 'dump.pickle')
|
||||||
|
|
||||||
def got_signal(signal_queue: Queue, sleep_sec: float):
|
def got_signal(signal_queue: Queue, sleep_sec: float):
|
||||||
|
|||||||
Reference in New Issue
Block a user