test_futures.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. from __future__ import with_statement
  2. import os
  3. import subprocess
  4. import sys
  5. import threading
  6. import functools
  7. import contextlib
  8. import logging
  9. import re
  10. import time
  11. from concurrent import futures
  12. from concurrent.futures._base import (
  13. PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
  14. try:
  15. import unittest2 as unittest
  16. except ImportError:
  17. import unittest
  18. try:
  19. from StringIO import StringIO
  20. except ImportError:
  21. from io import StringIO
  22. try:
  23. from test import test_support
  24. except ImportError:
  25. from test import support as test_support
  26. try:
  27. next
  28. except NameError:
  29. next = lambda x: x.next()
  30. def reap_threads(func):
  31. """Use this function when threads are being used. This will
  32. ensure that the threads are cleaned up even when the test fails.
  33. If threading is unavailable this function does nothing.
  34. """
  35. @functools.wraps(func)
  36. def decorator(*args):
  37. key = test_support.threading_setup()
  38. try:
  39. return func(*args)
  40. finally:
  41. test_support.threading_cleanup(*key)
  42. return decorator
  43. # Executing the interpreter in a subprocess
  44. def _assert_python(expected_success, *args, **env_vars):
  45. cmd_line = [sys.executable]
  46. if not env_vars:
  47. cmd_line.append('-E')
  48. # Need to preserve the original environment, for in-place testing of
  49. # shared library builds.
  50. env = os.environ.copy()
  51. # But a special flag that can be set to override -- in this case, the
  52. # caller is responsible to pass the full environment.
  53. if env_vars.pop('__cleanenv', None):
  54. env = {}
  55. env.update(env_vars)
  56. cmd_line.extend(args)
  57. p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
  58. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  59. env=env)
  60. try:
  61. out, err = p.communicate()
  62. finally:
  63. subprocess._cleanup()
  64. p.stdout.close()
  65. p.stderr.close()
  66. rc = p.returncode
  67. err = strip_python_stderr(err)
  68. if (rc and expected_success) or (not rc and not expected_success):
  69. raise AssertionError(
  70. "Process return code is %d, "
  71. "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
  72. return rc, out, err
  73. def assert_python_ok(*args, **env_vars):
  74. """
  75. Assert that running the interpreter with `args` and optional environment
  76. variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
  77. """
  78. return _assert_python(True, *args, **env_vars)
  79. def strip_python_stderr(stderr):
  80. """Strip the stderr of a Python process from potential debug output
  81. emitted by the interpreter.
  82. This will typically be run on the result of the communicate() method
  83. of a subprocess.Popen object.
  84. """
  85. stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
  86. return stderr
  87. @contextlib.contextmanager
  88. def captured_stderr():
  89. """Return a context manager used by captured_stdout/stdin/stderr
  90. that temporarily replaces the sys stream *stream_name* with a StringIO."""
  91. logging_stream = StringIO()
  92. handler = logging.StreamHandler(logging_stream)
  93. logging.root.addHandler(handler)
  94. try:
  95. yield logging_stream
  96. finally:
  97. logging.root.removeHandler(handler)
  98. def create_future(state=PENDING, exception=None, result=None):
  99. f = Future()
  100. f._state = state
  101. f._exception = exception
  102. f._result = result
  103. return f
  104. PENDING_FUTURE = create_future(state=PENDING)
  105. RUNNING_FUTURE = create_future(state=RUNNING)
  106. CANCELLED_FUTURE = create_future(state=CANCELLED)
  107. CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
  108. EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
  109. SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
  110. def mul(x, y):
  111. return x * y
  112. def sleep_and_raise(t):
  113. time.sleep(t)
  114. raise Exception('this is an exception')
  115. def sleep_and_print(t, msg):
  116. time.sleep(t)
  117. print(msg)
  118. sys.stdout.flush()
  119. class ExecutorMixin:
  120. worker_count = 5
  121. def setUp(self):
  122. self.t1 = time.time()
  123. try:
  124. self.executor = self.executor_type(max_workers=self.worker_count)
  125. except NotImplementedError:
  126. e = sys.exc_info()[1]
  127. self.skipTest(str(e))
  128. self._prime_executor()
  129. def tearDown(self):
  130. self.executor.shutdown(wait=True)
  131. dt = time.time() - self.t1
  132. if test_support.verbose:
  133. print("%.2fs" % dt)
  134. self.assertLess(dt, 60, "synchronization issue: test lasted too long")
  135. def _prime_executor(self):
  136. # Make sure that the executor is ready to do work before running the
  137. # tests. This should reduce the probability of timeouts in the tests.
  138. futures = [self.executor.submit(time.sleep, 0.1)
  139. for _ in range(self.worker_count)]
  140. for f in futures:
  141. f.result()
  142. class ThreadPoolMixin(ExecutorMixin):
  143. executor_type = futures.ThreadPoolExecutor
  144. class ProcessPoolMixin(ExecutorMixin):
  145. executor_type = futures.ProcessPoolExecutor
  146. class ExecutorShutdownTest(unittest.TestCase):
  147. def test_run_after_shutdown(self):
  148. self.executor.shutdown()
  149. self.assertRaises(RuntimeError,
  150. self.executor.submit,
  151. pow, 2, 5)
  152. def test_interpreter_shutdown(self):
  153. # Test the atexit hook for shutdown of worker threads and processes
  154. rc, out, err = assert_python_ok('-c', """if 1:
  155. from concurrent.futures import %s
  156. from time import sleep
  157. from test_futures import sleep_and_print
  158. t = %s(5)
  159. t.submit(sleep_and_print, 1.0, "apple")
  160. """ % (self.executor_type.__name__, self.executor_type.__name__))
  161. # Errors in atexit hooks don't change the process exit code, check
  162. # stderr manually.
  163. self.assertFalse(err)
  164. self.assertEqual(out.strip(), "apple".encode())
  165. def test_hang_issue12364(self):
  166. fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
  167. self.executor.shutdown()
  168. for f in fs:
  169. f.result()
  170. class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
  171. def _prime_executor(self):
  172. pass
  173. def test_threads_terminate(self):
  174. self.executor.submit(mul, 21, 2)
  175. self.executor.submit(mul, 6, 7)
  176. self.executor.submit(mul, 3, 14)
  177. self.assertEqual(len(self.executor._threads), 3)
  178. self.executor.shutdown()
  179. for t in self.executor._threads:
  180. t.join()
  181. def test_context_manager_shutdown(self):
  182. with futures.ThreadPoolExecutor(max_workers=5) as e:
  183. executor = e
  184. self.assertEqual(list(e.map(abs, range(-5, 5))),
  185. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  186. for t in executor._threads:
  187. t.join()
  188. def test_del_shutdown(self):
  189. executor = futures.ThreadPoolExecutor(max_workers=5)
  190. executor.map(abs, range(-5, 5))
  191. threads = executor._threads
  192. del executor
  193. for t in threads:
  194. t.join()
  195. class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
  196. def _prime_executor(self):
  197. pass
  198. def test_processes_terminate(self):
  199. self.executor.submit(mul, 21, 2)
  200. self.executor.submit(mul, 6, 7)
  201. self.executor.submit(mul, 3, 14)
  202. self.assertEqual(len(self.executor._processes), 5)
  203. processes = self.executor._processes
  204. self.executor.shutdown()
  205. for p in processes:
  206. p.join()
  207. def test_context_manager_shutdown(self):
  208. with futures.ProcessPoolExecutor(max_workers=5) as e:
  209. processes = e._processes
  210. self.assertEqual(list(e.map(abs, range(-5, 5))),
  211. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  212. for p in processes:
  213. p.join()
  214. def test_del_shutdown(self):
  215. executor = futures.ProcessPoolExecutor(max_workers=5)
  216. list(executor.map(abs, range(-5, 5)))
  217. queue_management_thread = executor._queue_management_thread
  218. processes = executor._processes
  219. del executor
  220. queue_management_thread.join()
  221. for p in processes:
  222. p.join()
  223. class WaitTests(unittest.TestCase):
  224. def test_first_completed(self):
  225. future1 = self.executor.submit(mul, 21, 2)
  226. future2 = self.executor.submit(time.sleep, 1.5)
  227. done, not_done = futures.wait(
  228. [CANCELLED_FUTURE, future1, future2],
  229. return_when=futures.FIRST_COMPLETED)
  230. self.assertEqual(set([future1]), done)
  231. self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
  232. def test_first_completed_some_already_completed(self):
  233. future1 = self.executor.submit(time.sleep, 1.5)
  234. finished, pending = futures.wait(
  235. [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
  236. return_when=futures.FIRST_COMPLETED)
  237. self.assertEqual(
  238. set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
  239. finished)
  240. self.assertEqual(set([future1]), pending)
  241. def test_first_exception(self):
  242. future1 = self.executor.submit(mul, 2, 21)
  243. future2 = self.executor.submit(sleep_and_raise, 1.5)
  244. future3 = self.executor.submit(time.sleep, 3)
  245. finished, pending = futures.wait(
  246. [future1, future2, future3],
  247. return_when=futures.FIRST_EXCEPTION)
  248. self.assertEqual(set([future1, future2]), finished)
  249. self.assertEqual(set([future3]), pending)
  250. def test_first_exception_some_already_complete(self):
  251. future1 = self.executor.submit(divmod, 21, 0)
  252. future2 = self.executor.submit(time.sleep, 1.5)
  253. finished, pending = futures.wait(
  254. [SUCCESSFUL_FUTURE,
  255. CANCELLED_FUTURE,
  256. CANCELLED_AND_NOTIFIED_FUTURE,
  257. future1, future2],
  258. return_when=futures.FIRST_EXCEPTION)
  259. self.assertEqual(set([SUCCESSFUL_FUTURE,
  260. CANCELLED_AND_NOTIFIED_FUTURE,
  261. future1]), finished)
  262. self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
  263. def test_first_exception_one_already_failed(self):
  264. future1 = self.executor.submit(time.sleep, 2)
  265. finished, pending = futures.wait(
  266. [EXCEPTION_FUTURE, future1],
  267. return_when=futures.FIRST_EXCEPTION)
  268. self.assertEqual(set([EXCEPTION_FUTURE]), finished)
  269. self.assertEqual(set([future1]), pending)
  270. def test_all_completed(self):
  271. future1 = self.executor.submit(divmod, 2, 0)
  272. future2 = self.executor.submit(mul, 2, 21)
  273. finished, pending = futures.wait(
  274. [SUCCESSFUL_FUTURE,
  275. CANCELLED_AND_NOTIFIED_FUTURE,
  276. EXCEPTION_FUTURE,
  277. future1,
  278. future2],
  279. return_when=futures.ALL_COMPLETED)
  280. self.assertEqual(set([SUCCESSFUL_FUTURE,
  281. CANCELLED_AND_NOTIFIED_FUTURE,
  282. EXCEPTION_FUTURE,
  283. future1,
  284. future2]), finished)
  285. self.assertEqual(set(), pending)
  286. def test_timeout(self):
  287. future1 = self.executor.submit(mul, 6, 7)
  288. future2 = self.executor.submit(time.sleep, 3)
  289. finished, pending = futures.wait(
  290. [CANCELLED_AND_NOTIFIED_FUTURE,
  291. EXCEPTION_FUTURE,
  292. SUCCESSFUL_FUTURE,
  293. future1, future2],
  294. timeout=1.5,
  295. return_when=futures.ALL_COMPLETED)
  296. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  297. EXCEPTION_FUTURE,
  298. SUCCESSFUL_FUTURE,
  299. future1]), finished)
  300. self.assertEqual(set([future2]), pending)
  301. class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
  302. def test_pending_calls_race(self):
  303. # Issue #14406: multi-threaded race condition when waiting on all
  304. # futures.
  305. event = threading.Event()
  306. def future_func():
  307. event.wait()
  308. oldswitchinterval = sys.getcheckinterval()
  309. sys.setcheckinterval(1)
  310. try:
  311. fs = set(self.executor.submit(future_func) for i in range(100))
  312. event.set()
  313. futures.wait(fs, return_when=futures.ALL_COMPLETED)
  314. finally:
  315. sys.setcheckinterval(oldswitchinterval)
  316. class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
  317. pass
  318. class AsCompletedTests(unittest.TestCase):
  319. # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
  320. def test_no_timeout(self):
  321. future1 = self.executor.submit(mul, 2, 21)
  322. future2 = self.executor.submit(mul, 7, 6)
  323. completed = set(futures.as_completed(
  324. [CANCELLED_AND_NOTIFIED_FUTURE,
  325. EXCEPTION_FUTURE,
  326. SUCCESSFUL_FUTURE,
  327. future1, future2]))
  328. self.assertEqual(set(
  329. [CANCELLED_AND_NOTIFIED_FUTURE,
  330. EXCEPTION_FUTURE,
  331. SUCCESSFUL_FUTURE,
  332. future1, future2]),
  333. completed)
  334. def test_zero_timeout(self):
  335. future1 = self.executor.submit(time.sleep, 2)
  336. completed_futures = set()
  337. try:
  338. for future in futures.as_completed(
  339. [CANCELLED_AND_NOTIFIED_FUTURE,
  340. EXCEPTION_FUTURE,
  341. SUCCESSFUL_FUTURE,
  342. future1],
  343. timeout=0):
  344. completed_futures.add(future)
  345. except futures.TimeoutError:
  346. pass
  347. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  348. EXCEPTION_FUTURE,
  349. SUCCESSFUL_FUTURE]),
  350. completed_futures)
  351. class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
  352. pass
  353. class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
  354. pass
  355. class ExecutorTest(unittest.TestCase):
  356. # Executor.shutdown() and context manager usage is tested by
  357. # ExecutorShutdownTest.
  358. def test_submit(self):
  359. future = self.executor.submit(pow, 2, 8)
  360. self.assertEqual(256, future.result())
  361. def test_submit_keyword(self):
  362. future = self.executor.submit(mul, 2, y=8)
  363. self.assertEqual(16, future.result())
  364. def test_map(self):
  365. self.assertEqual(
  366. list(self.executor.map(pow, range(10), range(10))),
  367. list(map(pow, range(10), range(10))))
  368. def test_map_exception(self):
  369. i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
  370. self.assertEqual(next(i), (0, 1))
  371. self.assertEqual(next(i), (0, 1))
  372. self.assertRaises(ZeroDivisionError, next, i)
  373. def test_map_timeout(self):
  374. results = []
  375. try:
  376. for i in self.executor.map(time.sleep,
  377. [0, 0, 3],
  378. timeout=1.5):
  379. results.append(i)
  380. except futures.TimeoutError:
  381. pass
  382. else:
  383. self.fail('expected TimeoutError')
  384. self.assertEqual([None, None], results)
  385. class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
  386. pass
  387. class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
  388. pass
  389. class FutureTests(unittest.TestCase):
  390. def test_done_callback_with_result(self):
  391. callback_result = [None]
  392. def fn(callback_future):
  393. callback_result[0] = callback_future.result()
  394. f = Future()
  395. f.add_done_callback(fn)
  396. f.set_result(5)
  397. self.assertEqual(5, callback_result[0])
  398. def test_done_callback_with_exception(self):
  399. callback_exception = [None]
  400. def fn(callback_future):
  401. callback_exception[0] = callback_future.exception()
  402. f = Future()
  403. f.add_done_callback(fn)
  404. f.set_exception(Exception('test'))
  405. self.assertEqual(('test',), callback_exception[0].args)
  406. def test_done_callback_with_cancel(self):
  407. was_cancelled = [None]
  408. def fn(callback_future):
  409. was_cancelled[0] = callback_future.cancelled()
  410. f = Future()
  411. f.add_done_callback(fn)
  412. self.assertTrue(f.cancel())
  413. self.assertTrue(was_cancelled[0])
  414. def test_done_callback_raises(self):
  415. with captured_stderr() as stderr:
  416. raising_was_called = [False]
  417. fn_was_called = [False]
  418. def raising_fn(callback_future):
  419. raising_was_called[0] = True
  420. raise Exception('doh!')
  421. def fn(callback_future):
  422. fn_was_called[0] = True
  423. f = Future()
  424. f.add_done_callback(raising_fn)
  425. f.add_done_callback(fn)
  426. f.set_result(5)
  427. self.assertTrue(raising_was_called)
  428. self.assertTrue(fn_was_called)
  429. self.assertIn('Exception: doh!', stderr.getvalue())
  430. def test_done_callback_already_successful(self):
  431. callback_result = [None]
  432. def fn(callback_future):
  433. callback_result[0] = callback_future.result()
  434. f = Future()
  435. f.set_result(5)
  436. f.add_done_callback(fn)
  437. self.assertEqual(5, callback_result[0])
  438. def test_done_callback_already_failed(self):
  439. callback_exception = [None]
  440. def fn(callback_future):
  441. callback_exception[0] = callback_future.exception()
  442. f = Future()
  443. f.set_exception(Exception('test'))
  444. f.add_done_callback(fn)
  445. self.assertEqual(('test',), callback_exception[0].args)
  446. def test_done_callback_already_cancelled(self):
  447. was_cancelled = [None]
  448. def fn(callback_future):
  449. was_cancelled[0] = callback_future.cancelled()
  450. f = Future()
  451. self.assertTrue(f.cancel())
  452. f.add_done_callback(fn)
  453. self.assertTrue(was_cancelled[0])
  454. def test_repr(self):
  455. self.assertRegexpMatches(repr(PENDING_FUTURE),
  456. '<Future at 0x[0-9a-f]+ state=pending>')
  457. self.assertRegexpMatches(repr(RUNNING_FUTURE),
  458. '<Future at 0x[0-9a-f]+ state=running>')
  459. self.assertRegexpMatches(repr(CANCELLED_FUTURE),
  460. '<Future at 0x[0-9a-f]+ state=cancelled>')
  461. self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
  462. '<Future at 0x[0-9a-f]+ state=cancelled>')
  463. self.assertRegexpMatches(
  464. repr(EXCEPTION_FUTURE),
  465. '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
  466. self.assertRegexpMatches(
  467. repr(SUCCESSFUL_FUTURE),
  468. '<Future at 0x[0-9a-f]+ state=finished returned int>')
  469. def test_cancel(self):
  470. f1 = create_future(state=PENDING)
  471. f2 = create_future(state=RUNNING)
  472. f3 = create_future(state=CANCELLED)
  473. f4 = create_future(state=CANCELLED_AND_NOTIFIED)
  474. f5 = create_future(state=FINISHED, exception=IOError())
  475. f6 = create_future(state=FINISHED, result=5)
  476. self.assertTrue(f1.cancel())
  477. self.assertEqual(f1._state, CANCELLED)
  478. self.assertFalse(f2.cancel())
  479. self.assertEqual(f2._state, RUNNING)
  480. self.assertTrue(f3.cancel())
  481. self.assertEqual(f3._state, CANCELLED)
  482. self.assertTrue(f4.cancel())
  483. self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
  484. self.assertFalse(f5.cancel())
  485. self.assertEqual(f5._state, FINISHED)
  486. self.assertFalse(f6.cancel())
  487. self.assertEqual(f6._state, FINISHED)
  488. def test_cancelled(self):
  489. self.assertFalse(PENDING_FUTURE.cancelled())
  490. self.assertFalse(RUNNING_FUTURE.cancelled())
  491. self.assertTrue(CANCELLED_FUTURE.cancelled())
  492. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
  493. self.assertFalse(EXCEPTION_FUTURE.cancelled())
  494. self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
  495. def test_done(self):
  496. self.assertFalse(PENDING_FUTURE.done())
  497. self.assertFalse(RUNNING_FUTURE.done())
  498. self.assertTrue(CANCELLED_FUTURE.done())
  499. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
  500. self.assertTrue(EXCEPTION_FUTURE.done())
  501. self.assertTrue(SUCCESSFUL_FUTURE.done())
  502. def test_running(self):
  503. self.assertFalse(PENDING_FUTURE.running())
  504. self.assertTrue(RUNNING_FUTURE.running())
  505. self.assertFalse(CANCELLED_FUTURE.running())
  506. self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
  507. self.assertFalse(EXCEPTION_FUTURE.running())
  508. self.assertFalse(SUCCESSFUL_FUTURE.running())
  509. def test_result_with_timeout(self):
  510. self.assertRaises(futures.TimeoutError,
  511. PENDING_FUTURE.result, timeout=0)
  512. self.assertRaises(futures.TimeoutError,
  513. RUNNING_FUTURE.result, timeout=0)
  514. self.assertRaises(futures.CancelledError,
  515. CANCELLED_FUTURE.result, timeout=0)
  516. self.assertRaises(futures.CancelledError,
  517. CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
  518. self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
  519. self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
  520. def test_result_with_success(self):
  521. # TODO(brian@sweetapp.com): This test is timing dependant.
  522. def notification():
  523. # Wait until the main thread is waiting for the result.
  524. time.sleep(1)
  525. f1.set_result(42)
  526. f1 = create_future(state=PENDING)
  527. t = threading.Thread(target=notification)
  528. t.start()
  529. self.assertEqual(f1.result(timeout=5), 42)
  530. def test_result_with_cancel(self):
  531. # TODO(brian@sweetapp.com): This test is timing dependant.
  532. def notification():
  533. # Wait until the main thread is waiting for the result.
  534. time.sleep(1)
  535. f1.cancel()
  536. f1 = create_future(state=PENDING)
  537. t = threading.Thread(target=notification)
  538. t.start()
  539. self.assertRaises(futures.CancelledError, f1.result, timeout=5)
  540. def test_exception_with_timeout(self):
  541. self.assertRaises(futures.TimeoutError,
  542. PENDING_FUTURE.exception, timeout=0)
  543. self.assertRaises(futures.TimeoutError,
  544. RUNNING_FUTURE.exception, timeout=0)
  545. self.assertRaises(futures.CancelledError,
  546. CANCELLED_FUTURE.exception, timeout=0)
  547. self.assertRaises(futures.CancelledError,
  548. CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
  549. self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
  550. IOError))
  551. self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
  552. def test_exception_with_success(self):
  553. def notification():
  554. # Wait until the main thread is waiting for the exception.
  555. time.sleep(1)
  556. with f1._condition:
  557. f1._state = FINISHED
  558. f1._exception = IOError()
  559. f1._condition.notify_all()
  560. f1 = create_future(state=PENDING)
  561. t = threading.Thread(target=notification)
  562. t.start()
  563. self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
  564. @reap_threads
  565. def test_main():
  566. try:
  567. test_support.run_unittest(ProcessPoolExecutorTest,
  568. ThreadPoolExecutorTest,
  569. ProcessPoolWaitTests,
  570. ThreadPoolWaitTests,
  571. ProcessPoolAsCompletedTests,
  572. ThreadPoolAsCompletedTests,
  573. FutureTests,
  574. ProcessPoolShutdownTest,
  575. ThreadPoolShutdownTest)
  576. finally:
  577. test_support.reap_children()
  578. if __name__ == "__main__":
  579. test_main()