123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723 |
- from __future__ import with_statement
- import os
- import subprocess
- import sys
- import threading
- import functools
- import contextlib
- import logging
- import re
- import time
- from concurrent import futures
- from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
- try:
- import unittest2 as unittest
- except ImportError:
- import unittest
- try:
- from StringIO import StringIO
- except ImportError:
- from io import StringIO
- try:
- from test import test_support
- except ImportError:
- from test import support as test_support
- try:
- next
- except NameError:
- next = lambda x: x.next()
- def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- If threading is unavailable this function does nothing.
- """
- @functools.wraps(func)
- def decorator(*args):
- key = test_support.threading_setup()
- try:
- return func(*args)
- finally:
- test_support.threading_cleanup(*key)
- return decorator
- # Executing the interpreter in a subprocess
- def _assert_python(expected_success, *args, **env_vars):
- cmd_line = [sys.executable]
- if not env_vars:
- cmd_line.append('-E')
- # Need to preserve the original environment, for in-place testing of
- # shared library builds.
- env = os.environ.copy()
- # But a special flag that can be set to override -- in this case, the
- # caller is responsible to pass the full environment.
- if env_vars.pop('__cleanenv', None):
- env = {}
- env.update(env_vars)
- cmd_line.extend(args)
- p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- env=env)
- try:
- out, err = p.communicate()
- finally:
- subprocess._cleanup()
- p.stdout.close()
- p.stderr.close()
- rc = p.returncode
- err = strip_python_stderr(err)
- if (rc and expected_success) or (not rc and not expected_success):
- raise AssertionError(
- "Process return code is %d, "
- "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
- return rc, out, err
- def assert_python_ok(*args, **env_vars):
- """
- Assert that running the interpreter with `args` and optional environment
- variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
- """
- return _assert_python(True, *args, **env_vars)
- def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
- return stderr
- @contextlib.contextmanager
- def captured_stderr():
- """Return a context manager used by captured_stdout/stdin/stderr
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- logging_stream = StringIO()
- handler = logging.StreamHandler(logging_stream)
- logging.root.addHandler(handler)
- try:
- yield logging_stream
- finally:
- logging.root.removeHandler(handler)
- def create_future(state=PENDING, exception=None, result=None):
- f = Future()
- f._state = state
- f._exception = exception
- f._result = result
- return f
- PENDING_FUTURE = create_future(state=PENDING)
- RUNNING_FUTURE = create_future(state=RUNNING)
- CANCELLED_FUTURE = create_future(state=CANCELLED)
- CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
- EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
- SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
- def mul(x, y):
- return x * y
- def sleep_and_raise(t):
- time.sleep(t)
- raise Exception('this is an exception')
- def sleep_and_print(t, msg):
- time.sleep(t)
- print(msg)
- sys.stdout.flush()
- class ExecutorMixin:
- worker_count = 5
- def setUp(self):
- self.t1 = time.time()
- try:
- self.executor = self.executor_type(max_workers=self.worker_count)
- except NotImplementedError:
- e = sys.exc_info()[1]
- self.skipTest(str(e))
- self._prime_executor()
- def tearDown(self):
- self.executor.shutdown(wait=True)
- dt = time.time() - self.t1
- if test_support.verbose:
- print("%.2fs" % dt)
- self.assertLess(dt, 60, "synchronization issue: test lasted too long")
- def _prime_executor(self):
- # Make sure that the executor is ready to do work before running the
- # tests. This should reduce the probability of timeouts in the tests.
- futures = [self.executor.submit(time.sleep, 0.1)
- for _ in range(self.worker_count)]
- for f in futures:
- f.result()
- class ThreadPoolMixin(ExecutorMixin):
- executor_type = futures.ThreadPoolExecutor
- class ProcessPoolMixin(ExecutorMixin):
- executor_type = futures.ProcessPoolExecutor
- class ExecutorShutdownTest(unittest.TestCase):
- def test_run_after_shutdown(self):
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.submit,
- pow, 2, 5)
- def test_interpreter_shutdown(self):
- # Test the atexit hook for shutdown of worker threads and processes
- rc, out, err = assert_python_ok('-c', """if 1:
- from concurrent.futures import %s
- from time import sleep
- from test_futures import sleep_and_print
- t = %s(5)
- t.submit(sleep_and_print, 1.0, "apple")
- """ % (self.executor_type.__name__, self.executor_type.__name__))
- # Errors in atexit hooks don't change the process exit code, check
- # stderr manually.
- self.assertFalse(err)
- self.assertEqual(out.strip(), "apple".encode())
- def test_hang_issue12364(self):
- fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
- self.executor.shutdown()
- for f in fs:
- f.result()
- class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
- def _prime_executor(self):
- pass
- def test_threads_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
- self.assertEqual(len(self.executor._threads), 3)
- self.executor.shutdown()
- for t in self.executor._threads:
- t.join()
- def test_context_manager_shutdown(self):
- with futures.ThreadPoolExecutor(max_workers=5) as e:
- executor = e
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for t in executor._threads:
- t.join()
- def test_del_shutdown(self):
- executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
- for t in threads:
- t.join()
- class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
- def _prime_executor(self):
- pass
- def test_processes_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
- self.assertEqual(len(self.executor._processes), 5)
- processes = self.executor._processes
- self.executor.shutdown()
- for p in processes:
- p.join()
- def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(max_workers=5) as e:
- processes = e._processes
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes:
- p.join()
- def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(max_workers=5)
- list(executor.map(abs, range(-5, 5)))
- queue_management_thread = executor._queue_management_thread
- processes = executor._processes
- del executor
- queue_management_thread.join()
- for p in processes:
- p.join()
- class WaitTests(unittest.TestCase):
- def test_first_completed(self):
- future1 = self.executor.submit(mul, 21, 2)
- future2 = self.executor.submit(time.sleep, 1.5)
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
- self.assertEqual(set([future1]), done)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- def test_first_completed_some_already_completed(self):
- future1 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
- self.assertEqual(
- set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
- finished)
- self.assertEqual(set([future1]), pending)
- def test_first_exception(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(sleep_and_raise, 1.5)
- future3 = self.executor.submit(time.sleep, 3)
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set([future3]), pending)
- def test_first_exception_some_already_complete(self):
- future1 = self.executor.submit(divmod, 21, 0)
- future2 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
- def test_first_exception_one_already_failed(self):
- future1 = self.executor.submit(time.sleep, 2)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([EXCEPTION_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
- def test_all_completed(self):
- future1 = self.executor.submit(divmod, 2, 0)
- future2 = self.executor.submit(mul, 2, 21)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2],
- return_when=futures.ALL_COMPLETED)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2]), finished)
- self.assertEqual(set(), pending)
- def test_timeout(self):
- future1 = self.executor.submit(mul, 6, 7)
- future2 = self.executor.submit(time.sleep, 3)
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=1.5,
- return_when=futures.ALL_COMPLETED)
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEqual(set([future2]), pending)
- class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
- def test_pending_calls_race(self):
- # Issue #14406: multi-threaded race condition when waiting on all
- # futures.
- event = threading.Event()
- def future_func():
- event.wait()
- oldswitchinterval = sys.getcheckinterval()
- sys.setcheckinterval(1)
- try:
- fs = set(self.executor.submit(future_func) for i in range(100))
- event.set()
- futures.wait(fs, return_when=futures.ALL_COMPLETED)
- finally:
- sys.setcheckinterval(oldswitchinterval)
- class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
- pass
- class AsCompletedTests(unittest.TestCase):
- # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
- def test_no_timeout(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(mul, 7, 6)
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEqual(set(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- def test_zero_timeout(self):
- future1 = self.executor.submit(time.sleep, 2)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
- pass
- class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
- pass
- class ExecutorTest(unittest.TestCase):
- # Executor.shutdown() and context manager usage is tested by
- # ExecutorShutdownTest.
- def test_submit(self):
- future = self.executor.submit(pow, 2, 8)
- self.assertEqual(256, future.result())
- def test_submit_keyword(self):
- future = self.executor.submit(mul, 2, y=8)
- self.assertEqual(16, future.result())
- def test_map(self):
- self.assertEqual(
- list(self.executor.map(pow, range(10), range(10))),
- list(map(pow, range(10), range(10))))
- def test_map_exception(self):
- i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
- self.assertEqual(next(i), (0, 1))
- self.assertEqual(next(i), (0, 1))
- self.assertRaises(ZeroDivisionError, next, i)
- def test_map_timeout(self):
- results = []
- try:
- for i in self.executor.map(time.sleep,
- [0, 0, 3],
- timeout=1.5):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- self.assertEqual([None, None], results)
- class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
- pass
- class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
- pass
- class FutureTests(unittest.TestCase):
- def test_done_callback_with_result(self):
- callback_result = [None]
- def fn(callback_future):
- callback_result[0] = callback_future.result()
- f = Future()
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertEqual(5, callback_result[0])
- def test_done_callback_with_exception(self):
- callback_exception = [None]
- def fn(callback_future):
- callback_exception[0] = callback_future.exception()
- f = Future()
- f.add_done_callback(fn)
- f.set_exception(Exception('test'))
- self.assertEqual(('test',), callback_exception[0].args)
- def test_done_callback_with_cancel(self):
- was_cancelled = [None]
- def fn(callback_future):
- was_cancelled[0] = callback_future.cancelled()
- f = Future()
- f.add_done_callback(fn)
- self.assertTrue(f.cancel())
- self.assertTrue(was_cancelled[0])
- def test_done_callback_raises(self):
- with captured_stderr() as stderr:
- raising_was_called = [False]
- fn_was_called = [False]
- def raising_fn(callback_future):
- raising_was_called[0] = True
- raise Exception('doh!')
- def fn(callback_future):
- fn_was_called[0] = True
- f = Future()
- f.add_done_callback(raising_fn)
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertTrue(raising_was_called)
- self.assertTrue(fn_was_called)
- self.assertIn('Exception: doh!', stderr.getvalue())
- def test_done_callback_already_successful(self):
- callback_result = [None]
- def fn(callback_future):
- callback_result[0] = callback_future.result()
- f = Future()
- f.set_result(5)
- f.add_done_callback(fn)
- self.assertEqual(5, callback_result[0])
- def test_done_callback_already_failed(self):
- callback_exception = [None]
- def fn(callback_future):
- callback_exception[0] = callback_future.exception()
- f = Future()
- f.set_exception(Exception('test'))
- f.add_done_callback(fn)
- self.assertEqual(('test',), callback_exception[0].args)
- def test_done_callback_already_cancelled(self):
- was_cancelled = [None]
- def fn(callback_future):
- was_cancelled[0] = callback_future.cancelled()
- f = Future()
- self.assertTrue(f.cancel())
- f.add_done_callback(fn)
- self.assertTrue(was_cancelled[0])
- def test_repr(self):
- self.assertRegexpMatches(repr(PENDING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=pending>')
- self.assertRegexpMatches(repr(RUNNING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=running>')
- self.assertRegexpMatches(repr(CANCELLED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(
- repr(EXCEPTION_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
- self.assertRegexpMatches(
- repr(SUCCESSFUL_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished returned int>')
- def test_cancel(self):
- f1 = create_future(state=PENDING)
- f2 = create_future(state=RUNNING)
- f3 = create_future(state=CANCELLED)
- f4 = create_future(state=CANCELLED_AND_NOTIFIED)
- f5 = create_future(state=FINISHED, exception=IOError())
- f6 = create_future(state=FINISHED, result=5)
- self.assertTrue(f1.cancel())
- self.assertEqual(f1._state, CANCELLED)
- self.assertFalse(f2.cancel())
- self.assertEqual(f2._state, RUNNING)
- self.assertTrue(f3.cancel())
- self.assertEqual(f3._state, CANCELLED)
- self.assertTrue(f4.cancel())
- self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
- self.assertFalse(f5.cancel())
- self.assertEqual(f5._state, FINISHED)
- self.assertFalse(f6.cancel())
- self.assertEqual(f6._state, FINISHED)
- def test_cancelled(self):
- self.assertFalse(PENDING_FUTURE.cancelled())
- self.assertFalse(RUNNING_FUTURE.cancelled())
- self.assertTrue(CANCELLED_FUTURE.cancelled())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
- self.assertFalse(EXCEPTION_FUTURE.cancelled())
- self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
- def test_done(self):
- self.assertFalse(PENDING_FUTURE.done())
- self.assertFalse(RUNNING_FUTURE.done())
- self.assertTrue(CANCELLED_FUTURE.done())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
- self.assertTrue(EXCEPTION_FUTURE.done())
- self.assertTrue(SUCCESSFUL_FUTURE.done())
- def test_running(self):
- self.assertFalse(PENDING_FUTURE.running())
- self.assertTrue(RUNNING_FUTURE.running())
- self.assertFalse(CANCELLED_FUTURE.running())
- self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
- self.assertFalse(EXCEPTION_FUTURE.running())
- self.assertFalse(SUCCESSFUL_FUTURE.running())
- def test_result_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.result, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
- self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
- self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
- def test_result_with_success(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.set_result(42)
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertEqual(f1.result(timeout=5), 42)
- def test_result_with_cancel(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.cancel()
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertRaises(futures.CancelledError, f1.result, timeout=5)
- def test_exception_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
- self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
- IOError))
- self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
- def test_exception_with_success(self):
- def notification():
- # Wait until the main thread is waiting for the exception.
- time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._exception = IOError()
- f1._condition.notify_all()
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
- @reap_threads
- def test_main():
- try:
- test_support.run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
- finally:
- test_support.reap_children()
- if __name__ == "__main__":
- test_main()
|