timeout_guard.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. ## @package timeout_guard
  2. # Module caffe2.python.timeout_guard
  3. import contextlib
  4. import threading
  5. import os
  6. import time
  7. import signal
  8. import logging
  9. from future.utils import viewitems
  10. '''
  11. Sometimes CUDA devices can get stuck, 'deadlock'. In this case it is often
  12. better just the kill the process automatically. Use this guard to set a
  13. maximum timespan for a python call, such as RunNet(). If it does not complete
  14. in time, process is killed.
  15. Example usage:
  16. with timeout_guard.CompleteInTimeOrDie(10.0):
  17. core.RunNet(...)
  18. '''
  19. class WatcherThread(threading.Thread):
  20. def __init__(self, timeout_secs):
  21. threading.Thread.__init__(self)
  22. self.timeout_secs = timeout_secs
  23. self.completed = False
  24. self.condition = threading.Condition()
  25. self.daemon = True
  26. self.caller_thread = threading.current_thread()
  27. def run(self):
  28. started = time.time()
  29. self.condition.acquire()
  30. while time.time() - started < self.timeout_secs and not self.completed:
  31. self.condition.wait(self.timeout_secs - (time.time() - started))
  32. self.condition.release()
  33. if not self.completed:
  34. log = logging.getLogger("timeout_guard")
  35. log.error("Call did not finish in time. Timeout:{}s PID: {}".format(
  36. self.timeout_secs,
  37. os.getpid(),
  38. ))
  39. # First try dying cleanly, but in 10 secs, exit properly
  40. def forcequit():
  41. time.sleep(10.0)
  42. log.info("Prepared output, dumping threads. ")
  43. print("Caller thread was: {}".format(self.caller_thread))
  44. print("-----After force------")
  45. log.info("-----After force------")
  46. import sys
  47. import traceback
  48. code = []
  49. for threadId, stack in viewitems(sys._current_frames()):
  50. if threadId == self.caller_thread.ident:
  51. code.append("\n# ThreadID: %s" % threadId)
  52. for filename, lineno, name, line in traceback.extract_stack(stack):
  53. code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
  54. if line:
  55. code.append(" %s" % (line.strip()))
  56. # Log also with logger, as it is comment practice to suppress print().
  57. print("\n".join(code))
  58. log.info("\n".join(code))
  59. log.error("Process did not terminate cleanly in 10 s, forcing")
  60. os.abort()
  61. forcet = threading.Thread(target=forcequit, args=())
  62. forcet.daemon = True
  63. forcet.start()
  64. print("Caller thread was: {}".format(self.caller_thread))
  65. print("-----Before forcing------")
  66. import sys
  67. import traceback
  68. code = []
  69. for threadId, stack in viewitems(sys._current_frames()):
  70. code.append("\n# ThreadID: %s" % threadId)
  71. for filename, lineno, name, line in traceback.extract_stack(stack):
  72. code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
  73. if line:
  74. code.append(" %s" % (line.strip()))
  75. # Log also with logger, as it is comment practice to suppress print().
  76. print("\n".join(code))
  77. log.info("\n".join(code))
  78. os.kill(os.getpid(), signal.SIGINT)
  79. @contextlib.contextmanager
  80. def CompleteInTimeOrDie(timeout_secs):
  81. watcher = WatcherThread(timeout_secs)
  82. watcher.start()
  83. yield
  84. watcher.completed = True
  85. watcher.condition.acquire()
  86. watcher.condition.notify()
  87. watcher.condition.release()
  88. def EuthanizeIfNecessary(timeout_secs=120):
  89. '''
  90. Call this if you have problem with process getting stuck at shutdown.
  91. It will kill the process if it does not terminate in timeout_secs.
  92. '''
  93. watcher = WatcherThread(timeout_secs)
  94. watcher.start()