Forráskód Böngészése

Use a thread pool that doesn't join workers

By default, a ThreadPoolExecutor will wait at Python interpreter shutdown for
all the threads to stop by themselves before letting the interpreter shut down.

We don't want that for the network requests thread pool, it causes a shutdown
latency if there are outstanding requests. Killing the threads in our pool is
perfectly safe so we can avoid the latency by introducing an
UnsafeThreadPoolExecutor.
Strahinja Val Markovic 11 éve
szülő
commit
7bfb4c3402

+ 2 - 2
python/ycm/client/base_request.py

@@ -23,12 +23,12 @@ import requests
 import urlparse
 from retries import retries
 from requests_futures.sessions import FuturesSession
-from concurrent.futures import ThreadPoolExecutor
+from ycm.unsafe_thread_pool_executor import UnsafeThreadPoolExecutor
 from ycm import vimsupport
 from ycm.server.responses import ServerError, UnknownExtraConf
 
 HEADERS = {'content-type': 'application/json'}
-EXECUTOR = ThreadPoolExecutor( max_workers = 10 )
+EXECUTOR = UnsafeThreadPoolExecutor( max_workers = 10 )
 # Setting this to None seems to screw up the Requests/urllib3 libs.
 DEFAULT_TIMEOUT_SEC = 30
 

+ 119 - 0
python/ycm/unsafe_thread_pool_executor.py

@@ -0,0 +1,119 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+#   Licensed to PSF under a Contributor Agreement.
+#
+# Copyright (C) 2013  Strahinja Val Markovic  <val@markovic.io>
+#   Changes to this file are licensed under the same terms as the original file
+#   (the Python Software Foundation License).
+
+
+from __future__ import with_statement
+import threading
+import weakref
+import sys
+
+from concurrent.futures import _base
+
+try:
+    import queue
+except ImportError:
+    import Queue as queue
+
+
+# This file provides an UnsafeThreadPoolExecutor, which operates exactly like
+# the upstream Python version of ThreadPoolExecutor with one exception: it
+# doesn't wait for worker threads to finish before shutting down the Python
+# interpreter.
+#
+# This is dangerous for many workloads, but fine for some (like when threads
+# only send network requests). The YCM workload is one of those workloads where
+# it's safe (the aforementioned network requests case).
+
+class _WorkItem(object):
+    def __init__(self, future, fn, args, kwargs):
+        self.future = future
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        if not self.future.set_running_or_notify_cancel():
+            return
+
+        try:
+            result = self.fn(*self.args, **self.kwargs)
+        except BaseException:
+            e = sys.exc_info()[1]
+            self.future.set_exception(e)
+        else:
+            self.future.set_result(result)
+
+def _worker(executor_reference, work_queue):
+    try:
+        while True:
+            work_item = work_queue.get(block=True)
+            if work_item is not None:
+                work_item.run()
+                continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
+    except BaseException:
+        _base.LOGGER.critical('Exception in worker', exc_info=True)
+
+class UnsafeThreadPoolExecutor(_base.Executor):
+    def __init__(self, max_workers):
+        """Initializes a new ThreadPoolExecutor instance.
+
+        Args:
+            max_workers: The maximum number of threads that can be used to
+                execute the given calls.
+        """
+        self._max_workers = max_workers
+        self._work_queue = queue.Queue()
+        self._threads = set()
+        self._shutdown = False
+        self._shutdown_lock = threading.Lock()
+
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            f = _base.Future()
+            w = _WorkItem(f, fn, args, kwargs)
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
+            return f
+    submit.__doc__ = _base.Executor.submit.__doc__
+
+    def _adjust_thread_count(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
+        # TODO(bquinlan): Should avoid creating new threads if there are more
+        # idle threads than items in the work queue.
+        if len(self._threads) < self._max_workers:
+            t = threading.Thread(target=_worker,
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
+            t.daemon = True
+            t.start()
+            self._threads.add(t)
+
+    def shutdown(self, wait=True):
+        with self._shutdown_lock:
+            self._shutdown = True
+            self._work_queue.put(None)
+        if wait:
+            for t in self._threads:
+                t.join()
+    shutdown.__doc__ = _base.Executor.shutdown.__doc__
+