Source code for concurrent.core.async.threads

# -*- coding: utf-8 -*-
"""
Module containing various base thread classes and threading helpers
"""

import ctypes
import threading
import inspect
import time

__all__ = ['InterruptibleThread', 'ThreadInterruptedError', 'ReadWriteLock', 'RWLockCache','makeRequests',
    'NoResultsPending', 'NoWorkersAvailable', 'ThreadPool', 'WorkRequest', 'WorkerThread']

[docs]class ThreadInterruptedError(Exception): """ Error used when an interruptible thread has been killed """
def _async_raise(tid, exctype): '''Raises an exception in the threads with id tid''' if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError("PyThreadState_SetAsyncExc failed")
[docs]class InterruptibleThread(threading.Thread): '''A thread class that supports raising exception in the thread from another thread. Based on the class from Bluebird75 [http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python] ''' def __init__(self, log): threading.Thread.__init__(self, name=self.__class__.__name__) self.log = log self.daemon = True def _get_my_tid(self): """determines this (self's) thread id CAREFUL : this function is executed in the context of the caller thread, to get the identity of the thread represented by this instance. """ if not self.isAlive(): raise threading.ThreadError("the thread is not active") # do we have it cached? if hasattr(self, "_thread_id"): return self._thread_id # no, look for it in the _active dict for tid, tobj in threading._active.items(): if tobj is self: self._thread_id = tid return tid # TODO: in python 2.6, there's a simpler way to do : self.ident raise AssertionError("could not determine the thread's id")
[docs] def raiseExc(self, exctype): """Raises the given exception type in the context of this thread. If the thread is busy in a system call (time.sleep(), socket.accept(), ...), the exception is simply ignored. If you are sure that your exception should terminate the thread, one way to ensure that it works is: t = ThreadWithExc( ... ) ... t.raiseExc( SomeException ) while t.isAlive(): time.sleep( 0.1 ) t.raiseExc( SomeException ) If the exception is to be caught by the thread, you need a way to check that your thread has caught it. CAREFUL : this function is executed in the context of the caller thread, to raise an excpetion in the context of the thread represented by this instance. """ _async_raise( self._get_my_tid(), exctype )
[docs] def stop_and_wait(self): """Raises a general exception within the thread and waits uppon completion. We will try to kill the thread 10 times. If we where not able to kill the thread we return False, other wise True. """ attempts = 10 self.raiseExc( AssertionError ) while self.isAlive() and attempts > 0: time.sleep( 0.1 ) self.raiseExc( ThreadInterruptedError ) attempts -= 1 # If the thread is alive we failed to kill it! if self.isAlive(): self.log.warn("Failed to kill thread %s" % str(self)) return False return True
[docs]class RWLockCache(object): """ A simple class used to hold a cache of locks. Locks are being accessed as they where members of the class itself. """ def __init__(self): object.__init__(self) self.cache = defaultdict(ReadWriteLock) def __getattr__(self, lock_name): return self.cache[lock_name]
""" rwlock.py - Read-Write lock thread lock implementation From http://code.activestate.com/recipes/502283-read-write-lock-class-rlock-like/ It features the modifications proposed by Jim Pryor (see the snippets comments) See the class documentation for more info. Copyright (C) 2007, Heiko Wundram. Released under the BSD-license. Other modifications done for 'Concurrent' are considered under MIT license. """ from threading import Condition, Lock, currentThread from contextlib import contextmanager from collections import defaultdict
[docs]class ReadWriteLock(object): """ Read-Write lock class. A read-write lock differs from a standard threading.RLock() by allowing multiple threads to simultaneously hold a read lock, while allowing only a single thread to hold a write lock at the same point of time. When a read lock is requested while a write lock is held, the reader is blocked; when a write lock is requested while another write lock is held or there are read locks, the writer is blocked. Writers are always preferred by this implementation: if there are blocked threads waiting for a write lock, current readers may request more read locks (which they eventually should free, as they starve the waiting writers otherwise), but a new thread requesting a read lock will not be granted one, and block. This might mean starvation for readers if two writer threads interweave their calls to acquireWrite() without leaving a window only for readers. In case a current reader requests a write lock, this can and will be satisfied without giving up the read locks first, but, only one thread may perform this kind of lock upgrade, as a deadlock would otherwise occur. After the write lock has been granted, the thread will hold a full write lock, and not be downgraded after the upgrading call to acquireWrite() has been match by a corresponding release(). """ def __init__(self): """Initialize this read-write lock.""" # Condition variable, used to signal waiters of a change in object # state. self.__condition = Condition(Lock()) # Initialize with no writers. self.__writer = None self.__upgradewritercount = 0 self.__pendingwriters = [] # Initialize with no readers. self.__readers = {}
[docs] def acquireRead(self, blocking=True, timeout=None): """ Acquire a read lock for the current thread, waiting at most timeout seconds or doing a non-blocking check in case timeout is <= 0. In case timeout is None, the call to acquireRead blocks until the lock request can be serviced. In case the timeout expires before the lock could be serviced, a RuntimeError is thrown. """ if not blocking: endtime = -1 elif timeout is not None: endtime = time.time() + timeout else: endtime = None me = currentThread() self.__condition.acquire() try: if self.__writer is me: # If we are the writer, grant a new read lock, always. self.__writercount += 1 return while True: if self.__writer is None: # Only test anything if there is no current writer. if self.__upgradewritercount or self.__pendingwriters: if me in self.__readers: # Only grant a read lock if we already have one # in case writers are waiting for their turn. # This means that writers can't easily get starved # (but see below, readers can). self.__readers[me] += 1 return # No, we aren't a reader (yet), wait for our turn. else: # Grant a new read lock, always, in case there are # no pending writers (and no writer). self.__readers[me] = self.__readers.get(me,0) + 1 return if endtime is not None: remaining = endtime - time.time() if remaining <= 0: # Timeout has expired, signal caller of this. raise RuntimeError("Acquiring read lock timed out") self.__condition.wait(remaining) else: self.__condition.wait() finally: self.__condition.release()
[docs] def acquireWrite(self, blocking=True, timeout=None): """ Acquire a write lock for the current thread, waiting at most timeout seconds or doing a non-blocking check in case timeout is <= 0. In case the write lock cannot be serviced due to the deadlock condition mentioned above, a ValueError is raised. In case timeout is None, the call to acquireWrite blocks until the lock request can be serviced. In case the timeout expires before the lock could be serviced, a RuntimeError is thrown. """ if not blocking: endtime = -1 elif timeout is not None: endtime = time.time() + timeout else: endtime = None me, upgradewriter = currentThread(), False self.__condition.acquire() try: if self.__writer is me: # If we are the writer, grant a new write lock, always. self.__writercount += 1 return elif me in self.__readers: # If we are a reader, no need to add us to pendingwriters, # we get the upgradewriter slot. if self.__upgradewritercount: # If we are a reader and want to upgrade, and someone # else also wants to upgrade, there is no way we can do # this except if one of us releases all his read locks. # Signal this to user. raise ValueError( "Inevitable dead lock, denying write lock" ) upgradewriter = True self.__upgradewritercount = self.__readers.pop(me) else: # We aren't a reader, so add us to the pending writers queue # for synchronization with the readers. self.__pendingwriters.append(me) while True: if not self.__readers and self.__writer is None: # Only test anything if there are no readers and writers. if self.__upgradewritercount: if upgradewriter: # There is a writer to upgrade, and it's us. Take # the write lock. self.__writer = me self.__writercount = self.__upgradewritercount + 1 self.__upgradewritercount = 0 return # There is a writer to upgrade, but it's not us. # Always leave the upgrade writer the advance slot, # because he presumes he'll get a write lock directly # from a previously held read lock. elif self.__pendingwriters[0] is me: # If there are no readers and writers, it's always # fine for us to take the writer slot, removing us # from the pending writers queue. # This might mean starvation for readers, though. self.__writer = me self.__writercount = 1 self.__pendingwriters = self.__pendingwriters[1:] return if endtime is not None: remaining = endtime - time.time() if remaining <= 0: # Timeout has expired, signal caller of this. if upgradewriter: # Put us back on the reader queue. No need to # signal anyone of this change, because no other # writer could've taken our spot before we got # here (because of remaining readers), as the test # for proper conditions is at the start of the # loop, not at the end. self.__readers[me] = self.__upgradewritercount self.__upgradewritercount = 0 else: # We were a simple pending writer, just remove us # from the FIFO list. self.__pendingwriters.remove(me) raise RuntimeError("Acquiring write lock timed out") self.__condition.wait(remaining) else: self.__condition.wait() finally: self.__condition.release()
[docs] def release(self): """ Release the currently held lock. In case the current thread holds no lock, a ValueError is thrown. """ me = currentThread() self.__condition.acquire() try: if self.__writer is me: # We are the writer, take one nesting depth away. self.__writercount -= 1 if not self.__writercount: # No more write locks; take our writer position away and # notify waiters of the new circumstances. self.__writer = None self.__condition.notifyAll() elif me in self.__readers: # We are a reader currently, take one nesting depth away. self.__readers[me] -= 1 if not self.__readers[me]: # No more read locks, take our reader position away. del self.__readers[me] if not self.__readers: # No more readers, notify waiters of the new # circumstances. self.__condition.notifyAll() else: raise ValueError("Trying to release unheld lock") finally: self.__condition.release()
@property @contextmanager
[docs] def readlock(self): self.acquireRead() try: yield finally: self.release()
@property @contextmanager
[docs] def writelock(self): self.acquireWrite() try: yield finally: self.release()
"""Easy to use object-oriented thread pool framework. A thread pool is an object that maintains a pool of worker threads to perform time consuming operations in parallel. It assigns jobs to the threads by putting them in a work request queue, where they are picked up by the next available thread. This then performs the requested operation in the background and puts the results in another queue. The thread pool object can then collect the results from all threads from this queue as soon as they become available or after all threads have finished their work. It's also possible, to define callbacks to handle each result as it comes in. The basic concept and some code was taken from the book "Python in a Nutshell, 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section 14.5 "Threaded Program Architecture". I wrapped the main program logic in the ThreadPool class, added the WorkRequest class and the callback system and tweaked the code here and there. Kudos also to Florent Aide for the exception handling mechanism. Basic usage:: >>> pool = ThreadPool(poolsize) >>> requests = makeRequests(some_callable, list_of_args, callback) >>> [pool.putRequest(req) for req in requests] >>> pool.wait() See the end of the module code for a brief, annotated usage example. MIT License Website : http://chrisarndt.de/projects/threadpool/ """ # standard library modules import sys import threading import Queue import traceback # exceptions
[docs]class NoResultsPending(Exception): """All work requests have been processed.""" pass
[docs]class NoWorkersAvailable(Exception): """No worker threads available to process remaining requests.""" pass # internal module helper functions
def _handle_thread_exception(request, exc_info): """Default exception handler callback function. This just prints the exception info via ``traceback.print_exception``. """ traceback.print_exception(*exc_info) # utility functions
[docs]def makeRequests(callable_, args_list, callback=None, exc_callback=_handle_thread_exception): """Create several work requests for same callable with different arguments. Convenience function for creating several work requests for the same callable where each invocation of the callable receives different values for its arguments. ``args_list`` contains the parameters for each invocation of callable. Each item in ``args_list`` should be either a 2-item tuple of the list of positional arguments and a dictionary of keyword arguments or a single, non-tuple argument. See docstring for ``WorkRequest`` for info on ``callback`` and ``exc_callback``. """ requests = [] for item in args_list: if isinstance(item, tuple): requests.append( WorkRequest(callable_, item[0], item[1], callback=callback, exc_callback=exc_callback) ) else: requests.append( WorkRequest(callable_, [item], None, callback=callback, exc_callback=exc_callback) ) return requests # classes
[docs]class WorkerThread(threading.Thread): """Background thread connected to the requests/results queues. A worker thread sits in the background and picks up work requests from one queue and puts the results in another until it is dismissed. """ def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): """Set up thread in daemonic mode and start it immediatedly. ``requests_queue`` and ``results_queue`` are instances of ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new worker thread. """ threading.Thread.__init__(self, **kwds) self.setDaemon(1) self._requests_queue = requests_queue self._results_queue = results_queue self._poll_timeout = poll_timeout self._dismissed = threading.Event() self.start()
[docs] def run(self): """Repeatedly process the job queue until told to exit.""" while True: if self._dismissed.isSet(): # we are dismissed, break out of loop break # get next work request. If we don't get a new request from the # queue after self._poll_timout seconds, we jump to the start of # the while loop again, to give the thread a chance to exit. try: request = self._requests_queue.get(True, self._poll_timeout) except Queue.Empty: continue else: if self._dismissed.isSet(): # we are dismissed, put back request in queue and exit loop self._requests_queue.put(request) break try: result = request.callable(*request.args, **request.kwds) self._results_queue.put((request, result)) except: request.exception = True self._results_queue.put((request, sys.exc_info()))
[docs] def dismiss(self): """Sets a flag to tell the thread to exit when done with current job.""" self._dismissed.set()
[docs]class WorkRequest: """A request to execute a callable for putting in the request queue later. See the module function ``makeRequests`` for the common case where you want to build several ``WorkRequest`` objects for the same callable but with different arguments for each call. """ def __init__(self, callable_, args=None, kwds=None, requestID=None, callback=None, exc_callback=_handle_thread_exception): """Create a work request for a callable and attach callbacks. A work request consists of the a callable to be executed by a worker thread, a list of positional arguments, a dictionary of keyword arguments. A ``callback`` function can be specified, that is called when the results of the request are picked up from the result queue. It must accept two anonymous arguments, the ``WorkRequest`` object and the results of the callable, in that order. If you want to pass additional information to the callback, just stick it on the request object. You can also give custom callback for when an exception occurs with the ``exc_callback`` keyword parameter. It should also accept two anonymous arguments, the ``WorkRequest`` and a tuple with the exception details as returned by ``sys.exc_info()``. The default implementation of this callback just prints the exception info via ``traceback.print_exception``. If you want no exception handler callback, just pass in ``None``. ``requestID``, if given, must be hashable since it is used by ``ThreadPool`` object to store the results of that work request in a dictionary. It defaults to the return value of ``id(self)``. """ if requestID is None: self.requestID = id(self) else: try: self.requestID = hash(requestID) except TypeError: raise TypeError("requestID must be hashable.") self.exception = False self.callback = callback self.exc_callback = exc_callback self.callable = callable_ self.args = args or [] self.kwds = kwds or {} def __str__(self): return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ (self.requestID, self.args, self.kwds, self.exception)
[docs]class ThreadPool: """A thread pool, distributing work requests and collecting results. See the module docstring for more information. """ def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): """Set up the thread pool and start num_workers worker threads. ``num_workers`` is the number of worker threads to start initially. If ``q_size > 0`` the size of the work *request queue* is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see ``putRequest`` method), unless you also use a positive ``timeout`` value for ``putRequest``. If ``resq_size > 0`` the size of the *results queue* is limited and the worker threads will block when the queue is full and they try to put new results in it. .. warning: If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is the possibilty of a deadlock, when the results queue is not pulled regularly and too many jobs are put in the work requests queue. To prevent this, always set ``timeout > 0`` when calling ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. """ self._requests_queue = Queue.Queue(q_size) self._results_queue = Queue.Queue(resq_size) self.workers = [] self.dismissedWorkers = [] self.workRequests = {} self.createWorkers(num_workers, poll_timeout)
[docs] def createWorkers(self, num_workers, poll_timeout=5): """Add num_workers worker threads to the pool. ``poll_timout`` sets the interval in seconds (int or float) for how ofte threads should check whether they are dismissed, while waiting for requests. """ for i in range(num_workers): self.workers.append(WorkerThread(self._requests_queue, self._results_queue, poll_timeout=poll_timeout))
[docs] def dismissWorkers(self, num_workers, do_join=False): """Tell num_workers worker threads to quit after their current task.""" dismiss_list = [] for i in range(min(num_workers, len(self.workers))): worker = self.workers.pop() worker.dismiss() dismiss_list.append(worker) if do_join: for worker in dismiss_list: worker.join() else: self.dismissedWorkers.extend(dismiss_list)
[docs] def joinAllDismissedWorkers(self): """Perform Thread.join() on all worker threads that have been dismissed. """ for worker in self.dismissedWorkers: worker.join() self.dismissedWorkers = []
[docs] def putRequest(self, request, block=True, timeout=None): """Put work request into work queue and save its id for later.""" assert isinstance(request, WorkRequest) # don't reuse old work requests assert not getattr(request, 'exception', None) self._requests_queue.put(request, block, timeout) self.workRequests[request.requestID] = request
[docs] def poll(self, block=False): """Process any new results in the queue.""" while True: # still results pending? if not self.workRequests: raise NoResultsPending # are there still workers to process remaining requests? elif block and not self.workers: raise NoWorkersAvailable try: # get back next results request, result = self._results_queue.get(block=block) # has an exception occured? if request.exception and request.exc_callback: request.exc_callback(request, result) # hand results to callback, if any if request.callback and not \ (request.exception and request.exc_callback): request.callback(request, result) del self.workRequests[request.requestID] except Queue.Empty: break
[docs] def wait(self): """Wait for results, blocking until all have arrived.""" while 1: try: self.poll(True) except NoResultsPending: break