Handle dead/killed/interrupted (sub)processes better
This commit is contained in:
@@ -37,6 +37,7 @@ def filefetcher(
|
|||||||
sleep_sec=0.5,
|
sleep_sec=0.5,
|
||||||
seek_pos=None):
|
seek_pos=None):
|
||||||
'''Latch onto a file, putting any new lines onto the queue.'''
|
'''Latch onto a file, putting any new lines onto the queue.'''
|
||||||
|
|
||||||
setproctitle('routerstats-collector file-fetcher')
|
setproctitle('routerstats-collector file-fetcher')
|
||||||
if float(sleep_sec) <= 0.0:
|
if float(sleep_sec) <= 0.0:
|
||||||
logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)')
|
logging.error('Cannot have sleep_sec <= 0, this breaks the code and your computer:)')
|
||||||
@@ -386,6 +387,7 @@ def socket_server(
|
|||||||
#Multiple connections here is probably a horrible idea:)
|
#Multiple connections here is probably a horrible idea:)
|
||||||
setproctitle('routerstats-collector socket_server')
|
setproctitle('routerstats-collector socket_server')
|
||||||
host, port = '', server_port
|
host, port = '', server_port
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
socketserver.TCPServer.allow_reuse_address = True
|
socketserver.TCPServer.allow_reuse_address = True
|
||||||
@@ -402,7 +404,6 @@ def socket_server(
|
|||||||
logging.info('Socket up at %s:%s', host, port)
|
logging.info('Socket up at %s:%s', host, port)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
logging.debug('Waiting for request')
|
|
||||||
server.handle_request()
|
server.handle_request()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.debug('Received KeyboardInterrupt')
|
logging.debug('Received KeyboardInterrupt')
|
||||||
@@ -416,7 +417,7 @@ def socket_server(
|
|||||||
try:
|
try:
|
||||||
recvd_signal = socket_server_signal_queue.get_nowait()
|
recvd_signal = socket_server_signal_queue.get_nowait()
|
||||||
if recvd_signal == 'Quit':
|
if recvd_signal == 'Quit':
|
||||||
logging.debug('Received Quit')
|
logging.warning('Shutting down socketserver')
|
||||||
server.server_close()
|
server.server_close()
|
||||||
return
|
return
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
@@ -516,7 +517,6 @@ def main():
|
|||||||
socket_server_signal_queue = Queue()
|
socket_server_signal_queue = Queue()
|
||||||
|
|
||||||
started_processes = []
|
started_processes = []
|
||||||
dead_processes = 0
|
|
||||||
|
|
||||||
load_pickled_file(file_parser_result_queue, args.var_dir + '/dump.pickle')
|
load_pickled_file(file_parser_result_queue, args.var_dir + '/dump.pickle')
|
||||||
start_pos = load_start_pos(args.file_to_follow, args.var_dir + '/position')
|
start_pos = load_start_pos(args.file_to_follow, args.var_dir + '/position')
|
||||||
@@ -556,6 +556,7 @@ def main():
|
|||||||
started_processes.append((socket_server_process, socket_server_signal_queue))
|
started_processes.append((socket_server_process, socket_server_signal_queue))
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:)
|
signal.signal(signal.SIGTERM, signal_handler) #Make sure subthreads get the info:)
|
||||||
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
|
|
||||||
#No idea how to manage this better?
|
#No idea how to manage this better?
|
||||||
while True:
|
while True:
|
||||||
@@ -566,12 +567,10 @@ def main():
|
|||||||
else:
|
else:
|
||||||
logging.error('%s has died prematurely?', p[0].name)
|
logging.error('%s has died prematurely?', p[0].name)
|
||||||
p[0].join()
|
p[0].join()
|
||||||
dead_processes += 1
|
raise TimeToQuit
|
||||||
if dead_processes >= len(started_processes):
|
|
||||||
logging.error('All processes has gone away :/')
|
|
||||||
sys.exit()
|
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
except (KeyboardInterrupt, TimeToQuit):
|
except (KeyboardInterrupt, TimeToQuit):
|
||||||
|
logging.warning('Asked to quit via exception TimeToQuit')
|
||||||
for p in started_processes:
|
for p in started_processes:
|
||||||
if p[0].is_alive():
|
if p[0].is_alive():
|
||||||
p[1].put('Quit')
|
p[1].put('Quit')
|
||||||
|
|||||||
Reference in New Issue
Block a user