Source code for concurrent.core.async.api

# -*- coding: utf-8 -*-
"""
Base interface for async tasks classes
"""
from concurrent.core.components.component import Interface

import uuid

__all__ = ['ITaskManager', 'ITaskSystem', 'ITaskScheduler', 'ITaskScheduleStrategy']

[docs]class ITaskManager(Interface): """ Interface used to define what a task manager is for us """
[docs] def init(identity, address): """ Initialize the manager using the provided master server data. """
[docs] def get_num_workers(): """ Return the number of workers we use for our processing """
[docs] def start(): """ Start our worker processes """
[docs] def stop(): """ Stop our worker processes """
[docs] def update_pool(_num_workers): """ Set the number of workers the task manager should use """
[docs] def push_task(task): """ Push a task that should be completed by the workers """
[docs] def wait_for_all(): """ Wait until all tasks has been finished """
[docs] def get_results_queue(): """ Return a refernce to the result queue """
[docs] def create_task(task_class): """ Create a task using a given type. TaskManager will assign a unique id to the created task """
[docs] def task_finished(task, result, error): """ Called once a task has been performed """ # TODO: If a TaskSystem finishes how can be delete remaining tasks if any? How can they be poisoned?
[docs]class ITaskSystem(object): """ A task system is an implemented system that provides a scheduler with tasks to be executed """ def __init__(self): """ Default constructor used to initialize the base values. The ctor is executed on the ApplicationNode and not called on the MasterNode so we can use it to initialize values. """ # Create a number of jobs that will be processed self._system_id = uuid.uuid1() @property
[docs] def system_id(self): return self._system_id
[docs] def init_system(self, master): """ Initialize the system """ raise NotImplementedError("init_system(self, master) not implemented!")
[docs] def get_system_id(self): raise NotImplementedError("get_system_id(self) not implemented!")
[docs] def generate_tasks(self, master): """ Generate the initial tasks this system requires """ raise NotImplementedError("generate_tasks(self, master) not implemented!")
[docs] def task_finished(self, master, task, result, error): """ Called once a task has been performed """ raise NotImplementedError("task_finished(self, master, task, result, error) not implemented!")
[docs] def gather_result(self, master): """ Once the system stated that it has finsihed the MasterNode will request the required results that are to be send to the originator. Returns a tuple like (result, Error) """ raise NotImplementedError("gather_result(self, master) not implemented!")
[docs] def is_complete(self, master): """ Ask the system if the computation has finsihed. If not we will go on and generate more tasks """ raise NotImplementedError("is_complete(self, master) not implemented!")
[docs]class ITaskScheduleStrategy(Interface): """ Implements a schedule strategy to select the next valid slave that should process a given task """
[docs] def setup(scheduler, master): """ Setup the stratey with required data """
[docs] def rate(slave): """ Rate a slave without """
[docs] def get_next_slave(): """ Get the slave that should process the next task """
[docs]class ITaskScheduler(Interface): """ Interface used by our distributed task scheduler. A scheduler receives an implemented system that will be executed on the distributed system through pickleing Python instances. """
[docs] def setup(master): """ Setup the scheduler with required data """
[docs] def rate_slaves(): """ Update slaves """
[docs] def start_system(task_system): """ Start an incomming task system """
[docs] def push_tasks(tasks): """ Push all tasks on the global task queue """
[docs] def push_task(task): """ Put a task on the global task queue """
[docs] def handle_task(task): """ Send a task to a slave or in case it failed queue the task back """
def _tasked_pushed(slave_id): """ A slave has aquired a new task, update its rank """
[docs] def task_finished(task, result, error): """ A slave has finished a new task, update its rank """