concurrent.core.async package

Submodules

concurrent.core.async.api module

Base interface for async tasks classes

class concurrent.core.async.api.ITaskManager[source]

Bases: concurrent.core.components.component.Interface

Interface used to define what a task manager is for us

create_task(task_class)[source]

Create a task using a given type. TaskManager will assign a unique id to the created task

get_num_workers()[source]

Return the number of workers we use for our processing

get_results_queue()[source]

Return a refernce to the result queue

init(identity, address)[source]

Initialize the manager using the provided master server data.

push_task(task)[source]

Push a task that should be completed by the workers

start()[source]

Start our worker processes

stop()[source]

Stop our worker processes

task_finished(task, result, error)[source]

Called once a task has been performed

update_pool(_num_workers)[source]

Set the number of workers the task manager should use

wait_for_all()[source]

Wait until all tasks has been finished

class concurrent.core.async.api.ITaskSystem[source]

Bases: object

A task system is an implemented system that provides a scheduler with tasks to be executed

gather_result(master)[source]

Once the system stated that it has finsihed the MasterNode will request the required results that are to be send to the originator. Returns a tuple like (result, Error)

generate_tasks(master)[source]

Generate the initial tasks this system requires

get_system_id()[source]
init_system(master)[source]

Initialize the system

is_complete(master)[source]

Ask the system if the computation has finsihed. If not we will go on and generate more tasks

system_id[source]
task_finished(master, task, result, error)[source]

Called once a task has been performed

class concurrent.core.async.api.ITaskScheduler[source]

Bases: concurrent.core.components.component.Interface

Interface used by our distributed task scheduler. A scheduler receives an implemented system that will be executed on the distributed system through pickleing Python instances.

handle_task(task)[source]

Send a task to a slave or in case it failed queue the task back

push_task(task)[source]

Put a task on the global task queue

push_tasks(tasks)[source]

Push all tasks on the global task queue

rate_slaves()[source]

Update slaves

setup(master)[source]

Setup the scheduler with required data

start_system(task_system)[source]

Start an incomming task system

task_finished(task, result, error)[source]

A slave has finished a new task, update its rank

class concurrent.core.async.api.ITaskScheduleStrategy[source]

Bases: concurrent.core.components.component.Interface

Implements a schedule strategy to select the next valid slave that should process a given task

get_next_slave()[source]

Get the slave that should process the next task

rate(slave)[source]

Rate a slave without

setup(scheduler, master)[source]

Setup the stratey with required data

concurrent.core.async.task module

Base class for any of our async tasks that a job is made of.

class concurrent.core.async.task.Task(name, system_id, client_id)[source]

Bases: object

A simple tasks that just executes a function in a fire and forget way

clean_up()[source]

Called once a task has been performed and its results are about to be sent back. This is used to optimize our network and to cleanup the tasks input data

finished(result, error)[source]

Once the task is finished. Called on the MasterNode within the main thread once the node has recovered the result data.

name[source]
system_id[source]
task_id[source]
class concurrent.core.async.task.GenericTaskManager(compmgr, init=<function __init__ at 0x0000000004FEDE48>, cls=<class 'concurrent.core.async.task.GenericTaskManager'>)[source]

Bases: concurrent.core.components.component.Component

get_num_workers()[source]

Return the number of workers we use for our processing

get_results_queue()[source]

Return a refernce to the result queue

init(identity, address)[source]

Initialize the manager

num_workers

Number of worker processed to be created, -1 will spawn as much as physical cores.

push_task(task)[source]

Push a task that should be completed by the workers

start()[source]

Start our worker processes

stop()[source]

Stop our worker processes

task_finished(task, result, error)[source]

Called once a task has been performed

update_pool(_num_workers=-1)[source]

Set the number of workers the task manager should use

wait_for_all()[source]

Wait until all tasks has been finished

concurrent.core.async.threads module

Module containing various base thread classes and threading helpers

class concurrent.core.async.threads.InterruptibleThread(log)[source]

Bases: threading.Thread

A thread class that supports raising exception in the thread from another thread. Based on the class from Bluebird75 [http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python]

raiseExc(exctype)[source]

Raises the given exception type in the context of this thread.

If the thread is busy in a system call (time.sleep(), socket.accept(), ...), the exception is simply ignored.

If you are sure that your exception should terminate the thread, one way to ensure that it works is:

t = ThreadWithExc( ... ) ... t.raiseExc( SomeException ) while t.isAlive():

time.sleep( 0.1 ) t.raiseExc( SomeException )

If the exception is to be caught by the thread, you need a way to check that your thread has caught it.

CAREFUL : this function is executed in the context of the caller thread, to raise an excpetion in the context of the thread represented by this instance.

stop_and_wait()[source]

Raises a general exception within the thread and waits uppon completion. We will try to kill the thread 10 times. If we where not able to kill the thread we return False, other wise True.

exception concurrent.core.async.threads.ThreadInterruptedError[source]

Bases: exceptions.Exception

Error used when an interruptible thread has been killed

class concurrent.core.async.threads.ReadWriteLock[source]

Bases: object

Read-Write lock class. A read-write lock differs from a standard threading.RLock() by allowing multiple threads to simultaneously hold a read lock, while allowing only a single thread to hold a write lock at the same point of time.

When a read lock is requested while a write lock is held, the reader is blocked; when a write lock is requested while another write lock is held or there are read locks, the writer is blocked.

Writers are always preferred by this implementation: if there are blocked threads waiting for a write lock, current readers may request more read locks (which they eventually should free, as they starve the waiting writers otherwise), but a new thread requesting a read lock will not be granted one, and block. This might mean starvation for readers if two writer threads interweave their calls to acquireWrite() without leaving a window only for readers.

In case a current reader requests a write lock, this can and will be satisfied without giving up the read locks first, but, only one thread may perform this kind of lock upgrade, as a deadlock would otherwise occur. After the write lock has been granted, the thread will hold a full write lock, and not be downgraded after the upgrading call to acquireWrite() has been match by a corresponding release().

acquireRead(blocking=True, timeout=None)[source]

Acquire a read lock for the current thread, waiting at most timeout seconds or doing a non-blocking check in case timeout is <= 0.

In case timeout is None, the call to acquireRead blocks until the lock request can be serviced.

In case the timeout expires before the lock could be serviced, a RuntimeError is thrown.

acquireWrite(blocking=True, timeout=None)[source]

Acquire a write lock for the current thread, waiting at most timeout seconds or doing a non-blocking check in case timeout is <= 0.

In case the write lock cannot be serviced due to the deadlock condition mentioned above, a ValueError is raised.

In case timeout is None, the call to acquireWrite blocks until the lock request can be serviced.

In case the timeout expires before the lock could be serviced, a RuntimeError is thrown.

readlock[source]
release()[source]

Release the currently held lock.

In case the current thread holds no lock, a ValueError is thrown.

writelock[source]
class concurrent.core.async.threads.RWLockCache[source]

Bases: object

A simple class used to hold a cache of locks. Locks are being accessed as they where members of the class itself.

concurrent.core.async.threads.makeRequests(callable_, args_list, callback=None, exc_callback=<function _handle_thread_exception at 0x0000000004FA4208>)[source]

Create several work requests for same callable with different arguments.

Convenience function for creating several work requests for the same callable where each invocation of the callable receives different values for its arguments.

args_list contains the parameters for each invocation of callable. Each item in args_list should be either a 2-item tuple of the list of positional arguments and a dictionary of keyword arguments or a single, non-tuple argument.

See docstring for WorkRequest for info on callback and exc_callback.

exception concurrent.core.async.threads.NoResultsPending[source]

Bases: exceptions.Exception

All work requests have been processed.

exception concurrent.core.async.threads.NoWorkersAvailable[source]

Bases: exceptions.Exception

No worker threads available to process remaining requests.

class concurrent.core.async.threads.ThreadPool(num_workers, q_size=0, resq_size=0, poll_timeout=5)[source]

A thread pool, distributing work requests and collecting results.

See the module docstring for more information.

createWorkers(num_workers, poll_timeout=5)[source]

Add num_workers worker threads to the pool.

poll_timout sets the interval in seconds (int or float) for how ofte threads should check whether they are dismissed, while waiting for requests.

dismissWorkers(num_workers, do_join=False)[source]

Tell num_workers worker threads to quit after their current task.

joinAllDismissedWorkers()[source]

Perform Thread.join() on all worker threads that have been dismissed.

poll(block=False)[source]

Process any new results in the queue.

putRequest(request, block=True, timeout=None)[source]

Put work request into work queue and save its id for later.

wait()[source]

Wait for results, blocking until all have arrived.

class concurrent.core.async.threads.WorkRequest(callable_, args=None, kwds=None, requestID=None, callback=None, exc_callback=<function _handle_thread_exception at 0x0000000004FA4208>)[source]

A request to execute a callable for putting in the request queue later.

See the module function makeRequests for the common case where you want to build several WorkRequest objects for the same callable but with different arguments for each call.

class concurrent.core.async.threads.WorkerThread(requests_queue, results_queue, poll_timeout=5, **kwds)[source]

Bases: threading.Thread

Background thread connected to the requests/results queues.

A worker thread sits in the background and picks up work requests from one queue and puts the results in another until it is dismissed.

dismiss()[source]

Sets a flag to tell the thread to exit when done with current job.

run()[source]

Repeatedly process the job queue until told to exit.

Module contents