From e6c14e9321fbb161f4a725e6b3c9c6170c3673d6 Mon Sep 17 00:00:00 2001 From: Eugene Vorontsov Date: Sat, 24 Aug 2019 15:57:04 -0400 Subject: [PATCH 1/4] Fix unclean thread termination to flush events to file on script termination, garbage collection, or interrupt. --- tensorboardX/event_file_writer.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tensorboardX/event_file_writer.py b/tensorboardX/event_file_writer.py index 3f3d0ced..106d1382 100644 --- a/tensorboardX/event_file_writer.py +++ b/tensorboardX/event_file_writer.py @@ -18,6 +18,7 @@ from __future__ import division from __future__ import print_function +import atexit import os import socket import threading @@ -111,6 +112,7 @@ def __init__(self, logdir, max_queue_size=10, flush_secs=120, filename_suffix='' flush_secs) self._worker.start() + atexit.register(self.close) def get_logdir(self): """Returns the directory where event file will be written.""" @@ -154,11 +156,15 @@ def close(self): need the summary writer anymore. """ if not self._closed: - self.flush() self._worker.stop() + self._worker.join() + self.flush() self._ev_writer.close() self._closed = True + def __del__(self): + self.close() + class _EventLoggerThread(threading.Thread): """Thread that logs events.""" @@ -173,6 +179,15 @@ def __init__(self, queue, record_writer, flush_secs): pending file to disk. """ threading.Thread.__init__(self) + # NOTE: although this thread writes to disk, it is a daemon thread + # so that the python interpretor does not wait until the thread + # completes to begin tearing down the environment at the end of a + # script. Instead, an `atexit` function ensures clean termination + # of this thread before the rest of the environment is torn down. + # If this thread were not a daemon, the user would have to manually + # call close() on the summary writer that spawns this thread in order + # to terminate it and allow the main process to exit at the end of + # a script. self.daemon = True self._queue = queue self._record_writer = record_writer @@ -184,7 +199,6 @@ def __init__(self, queue, record_writer, flush_secs): def stop(self): self._queue.put(self._shutdown_signal) - self.join() def run(self): # Here wait on the queue until an data appears, or till the next From 16d52a0b31ec98f9e3f9da090be59975e49e0c45 Mon Sep 17 00:00:00 2001 From: Eugene Vorontsov Date: Sat, 24 Aug 2019 17:13:22 -0400 Subject: [PATCH 2/4] Fix issue where the event writer and flusher thread cannot be garbage collected until the end of the script. --- tensorboardX/event_file_writer.py | 53 ++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/tensorboardX/event_file_writer.py b/tensorboardX/event_file_writer.py index 106d1382..62e3ba82 100644 --- a/tensorboardX/event_file_writer.py +++ b/tensorboardX/event_file_writer.py @@ -18,11 +18,11 @@ from __future__ import division from __future__ import print_function -import atexit import os import socket import threading import time +import weakref import six @@ -110,9 +110,23 @@ def __init__(self, logdir, max_queue_size=10, flush_secs=120, filename_suffix='' self._closed = False self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, flush_secs) - + # Make sure that at exit, the thread is shut down cleanly and events + # are flushed to disk. A weak reference is used to not block garbage + # collection of this object and the thread. + def thread_finalizer(ref): + obj = ref() + if obj is not None: + # If the `EventFileWriter` object instance is deleted, then + # `obj` is `None`; do nothing. This means `__del__` has + # already been called and has already called `_close(obj)`. + # This can happen when this thread finalizer is called at + # exit after the thread has already been terminated earlier + # in the code. + EventFileWriter._close(obj) + self._thread_finalizer = weakref.finalize(self._worker, + thread_finalizer, + weakref.ref(self)) self._worker.start() - atexit.register(self.close) def get_logdir(self): """Returns the directory where event file will be written.""" @@ -155,12 +169,21 @@ def close(self): write/flush worker and closes the file. Call this method when you do not need the summary writer anymore. """ - if not self._closed: - self._worker.stop() - self._worker.join() - self.flush() - self._ev_writer.close() - self._closed = True + EventFileWriter._close(self) + + @staticmethod + def _close(obj): + """This method is static so that a `weakref.finalizer` could run it + without storing a reference to the object that produces the `close()` + method, thus allowing this object to be garbage collected before the + interpreter begins exit procedures. + """ + if not obj._closed: + obj._worker.stop() + obj._worker.join() + obj.flush() + obj._ev_writer.close() + obj._closed = True def __del__(self): self.close() @@ -182,12 +205,12 @@ def __init__(self, queue, record_writer, flush_secs): # NOTE: although this thread writes to disk, it is a daemon thread # so that the python interpretor does not wait until the thread # completes to begin tearing down the environment at the end of a - # script. Instead, an `atexit` function ensures clean termination - # of this thread before the rest of the environment is torn down. - # If this thread were not a daemon, the user would have to manually - # call close() on the summary writer that spawns this thread in order - # to terminate it and allow the main process to exit at the end of - # a script. + # script. Instead, a weakref finalizer ensures clean termination + # of this thread at exit before the rest of the environment is torn + # down. If this thread were not a daemon, the user would have to + # manuallycall close() on the summary writer that spawns this thread + # in orderto terminate it and allow the main process to exit at the + # end of a script. self.daemon = True self._queue = queue self._record_writer = record_writer From eb41d9e768898146c251e52589e2921c11a3c578 Mon Sep 17 00:00:00 2001 From: Eugene Vorontsov Date: Sat, 24 Aug 2019 22:13:17 -0400 Subject: [PATCH 3/4] Support python2 with `atexit` instead of `weakref.finalize`. --- tensorboardX/event_file_writer.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tensorboardX/event_file_writer.py b/tensorboardX/event_file_writer.py index 62e3ba82..82535622 100644 --- a/tensorboardX/event_file_writer.py +++ b/tensorboardX/event_file_writer.py @@ -18,6 +18,7 @@ from __future__ import division from __future__ import print_function +import atexit import os import socket import threading @@ -123,9 +124,14 @@ def thread_finalizer(ref): # exit after the thread has already been terminated earlier # in the code. EventFileWriter._close(obj) - self._thread_finalizer = weakref.finalize(self._worker, - thread_finalizer, - weakref.ref(self)) + if hasattr(weakref, 'finalize'): + # Python 3 + self._thread_finalizer = weakref.finalize(self._worker, + thread_finalizer, + weakref.ref(self)) + else: + # Python 2 + atexit.register(thread_finalizer, weakref.ref(self)) self._worker.start() def get_logdir(self): From 102cdb6a6b75d066626c5a47123b8a065a061705 Mon Sep 17 00:00:00 2001 From: Eugene Vorontsov Date: Sat, 24 Aug 2019 23:00:23 -0400 Subject: [PATCH 4/4] Fix typos in comments. --- tensorboardX/event_file_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorboardX/event_file_writer.py b/tensorboardX/event_file_writer.py index 82535622..7cad2a1c 100644 --- a/tensorboardX/event_file_writer.py +++ b/tensorboardX/event_file_writer.py @@ -214,8 +214,8 @@ def __init__(self, queue, record_writer, flush_secs): # script. Instead, a weakref finalizer ensures clean termination # of this thread at exit before the rest of the environment is torn # down. If this thread were not a daemon, the user would have to - # manuallycall close() on the summary writer that spawns this thread - # in orderto terminate it and allow the main process to exit at the + # manually call close() on the summary writer that spawns this thread + # in order to terminate it and allow the main process to exit at the # end of a script. self.daemon = True self._queue = queue