1
0

thread.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # Copyright 2009 Brian Quinlan. All Rights Reserved.
  2. # Licensed to PSF under a Contributor Agreement.
  3. """Implements ThreadPoolExecutor."""
  4. from __future__ import with_statement
  5. import atexit
  6. import threading
  7. import weakref
  8. import sys
  9. from concurrent.futures import _base
  10. try:
  11. import queue
  12. except ImportError:
  13. import Queue as queue
  14. __author__ = 'Brian Quinlan (brian@sweetapp.com)'
  15. # Workers are created as daemon threads. This is done to allow the interpreter
  16. # to exit when there are still idle threads in a ThreadPoolExecutor's thread
  17. # pool (i.e. shutdown() was not called). However, allowing workers to die with
  18. # the interpreter has two undesirable properties:
  19. # - The workers would still be running during interpretor shutdown,
  20. # meaning that they would fail in unpredictable ways.
  21. # - The workers could be killed while evaluating a work item, which could
  22. # be bad if the callable being evaluated has external side-effects e.g.
  23. # writing to a file.
  24. #
  25. # To work around this problem, an exit handler is installed which tells the
  26. # workers to exit when their work queues are empty and then waits until the
  27. # threads finish.
  28. _threads_queues = weakref.WeakKeyDictionary()
  29. _shutdown = False
  30. def _python_exit():
  31. global _shutdown
  32. _shutdown = True
  33. items = list(_threads_queues.items())
  34. for t, q in items:
  35. q.put(None)
  36. for t, q in items:
  37. t.join()
  38. atexit.register(_python_exit)
  39. class _WorkItem(object):
  40. def __init__(self, future, fn, args, kwargs):
  41. self.future = future
  42. self.fn = fn
  43. self.args = args
  44. self.kwargs = kwargs
  45. def run(self):
  46. if not self.future.set_running_or_notify_cancel():
  47. return
  48. try:
  49. result = self.fn(*self.args, **self.kwargs)
  50. except BaseException:
  51. e = sys.exc_info()[1]
  52. self.future.set_exception(e)
  53. else:
  54. self.future.set_result(result)
  55. def _worker(executor_reference, work_queue):
  56. try:
  57. while True:
  58. work_item = work_queue.get(block=True)
  59. if work_item is not None:
  60. work_item.run()
  61. continue
  62. executor = executor_reference()
  63. # Exit if:
  64. # - The interpreter is shutting down OR
  65. # - The executor that owns the worker has been collected OR
  66. # - The executor that owns the worker has been shutdown.
  67. if _shutdown or executor is None or executor._shutdown:
  68. # Notice other workers
  69. work_queue.put(None)
  70. return
  71. del executor
  72. except BaseException:
  73. _base.LOGGER.critical('Exception in worker', exc_info=True)
  74. class ThreadPoolExecutor(_base.Executor):
  75. def __init__(self, max_workers):
  76. """Initializes a new ThreadPoolExecutor instance.
  77. Args:
  78. max_workers: The maximum number of threads that can be used to
  79. execute the given calls.
  80. """
  81. self._max_workers = max_workers
  82. self._work_queue = queue.Queue()
  83. self._threads = set()
  84. self._shutdown = False
  85. self._shutdown_lock = threading.Lock()
  86. def submit(self, fn, *args, **kwargs):
  87. with self._shutdown_lock:
  88. if self._shutdown:
  89. raise RuntimeError('cannot schedule new futures after shutdown')
  90. f = _base.Future()
  91. w = _WorkItem(f, fn, args, kwargs)
  92. self._work_queue.put(w)
  93. self._adjust_thread_count()
  94. return f
  95. submit.__doc__ = _base.Executor.submit.__doc__
  96. def _adjust_thread_count(self):
  97. # When the executor gets lost, the weakref callback will wake up
  98. # the worker threads.
  99. def weakref_cb(_, q=self._work_queue):
  100. q.put(None)
  101. # TODO(bquinlan): Should avoid creating new threads if there are more
  102. # idle threads than items in the work queue.
  103. if len(self._threads) < self._max_workers:
  104. t = threading.Thread(target=_worker,
  105. args=(weakref.ref(self, weakref_cb),
  106. self._work_queue))
  107. t.daemon = True
  108. t.start()
  109. self._threads.add(t)
  110. _threads_queues[t] = self._work_queue
  111. def shutdown(self, wait=True):
  112. with self._shutdown_lock:
  113. self._shutdown = True
  114. self._work_queue.put(None)
  115. if wait:
  116. for t in self._threads:
  117. t.join()
  118. shutdown.__doc__ = _base.Executor.shutdown.__doc__