# -*- coding: utf-8 -*-
"""
Base class for any of our async tasks that a job is made of.
"""
from threading import Thread
from inspect import Traceback
__all__ = ['Task', 'GenericTaskManager', 'GenericTaskSchduler']
from concurrent.core.components.component import Component, implements
from concurrent.core.async.api import ITaskManager, ITaskScheduler, ITaskScheduleStrategy
from concurrent.core.async.threads import InterruptibleThread
from concurrent.core.config.config import IntItem, ExtensionPointItem, HostItem
from concurrent.core.application.api import NodeType, NodeState
from concurrent.core.util.stats import Stats
from concurrent.core.util.utils import tprint
from concurrent.core.transport.tcpsocket import send_to_zmq_zipped, receive_from_zmq_zipped, TCPSocketZMQ
from bunch import Bunch
from collections import OrderedDict
import traceback
import urllib2
import multiprocessing
import threading
import zmq
import os
import time
import uuid
import sys
# Port used for our worker process using zmq instead of queues for best performance
WORKER_PORT = 5557
# http://glitterbug.in/blog/task-queuing-in-django-with-zeromq-5/show/
# http://zguide.zeromq.org/py:rtreq
# http://www.jeffknupp.com/blog/2014/02/11/a-celerylike-python-task-queue-in-55-lines-of-code/
# Or back to queues?
# We use zipped pickles because they transmit way faster!
[docs]class Task(object):
"""
A simple tasks that just executes a function in a fire and forget way
"""
def __init__(self, name, system_id, client_id):
"""
Initialize the task itself
"""
self._name = name
self._id = str(uuid.uuid1())
# The system ID is the ID of the system that owns the task. We use this
# ID to send the task back to its owner once finished
self._system_id = system_id
# The slave ID is the ID of the slave that processed the task
self.slave_id = None
# The client ID is set when a push a task onto the framework
self.client_id = client_id
@property
[docs] def name(self):
return self._name
@property
[docs] def task_id(self):
return self._id
@property
[docs] def system_id(self):
return self._system_id
def __call__(self):
"""
Executer a task
"""
raise NotImplementedError("Subclasses should implement this!")
[docs] def clean_up(self):
"""
Called once a task has been performed and its results are about to be sent back. This is used
to optimize our network and to cleanup the tasks input data
"""
pass
[docs] def finished(self, result, error):
"""
Once the task is finished. Called on the MasterNode within the main thread once
the node has recovered the result data.
"""
raise NotImplementedError("Subclasses should implement this!")
class TaskProcess(multiprocessing.Process):
"""
A TaskProcess is the base consumer class of work orders
"""
def __init__(self, result_queue, process_id, task_queue, identity, host, port):
multiprocessing.Process.__init__(self)
self._id = process_id
self._pid = 0
self.result_queue = result_queue
self.task_queue = task_queue
self.identity = identity
self.host = host
self.port = port
# Create logger for each process!
def log(self, msg):
tprint("[%s][%d] %s" % (self.__class__.__name__, self._pid, msg))
def run(self):
self._pid = os.getpid()
#self.context = zmq.Context()
#self.work_receiver = context.socket(zmq.PULL)
#self.work_receiver.connect("tcp://127.0.0.1:%d" % WORKER_PORT)
# Create sockets to communicate with master, this way we optimize our resources
self.socket=TCPSocketZMQ("{}_{}".format(self._id, self.identity), self.host, self.port)
self.socket.connect()
self.stats = Stats.getInstance()
self.log("Running")
while True:
try:
#next_task = receive_from_zmq_zipped(self.work_receiver)
next_task = self.task_queue.get()
if next_task is None:
# A None task is used to shut us down
self.task_queue.task_done()
break
result = None
try:
#start = time.time()
result = next_task()
self.task_queue.task_done()
#ellapsed = time.time() - start
error = None
#self.stats.add_avg("task-time",ellapsed)
#self.log("Finished [%s:%s]" % (next_task.name, next_task.task_id))
except Exception as err:
result = None
error = err
finally:
#self.result_queue.put(Bunch({'task':next_task,'result':result,'error':error}))
#print("sending back")
next_task.clean_up()
self.socket.send_to('task_finished', next_task, result, error)
except KeyboardInterrupt:
self.log("Keyboard interrupt received, exiting!")
break
self.socket.close()
#self.work_receiver.close()
#self.context.term()
self.log("Exiting")
[docs]class GenericTaskManager(Component):
implements(ITaskManager)
"""
Simple task manager used in simple single job applications
"""
num_workers = IntItem('GenericTaskManager', 'num_workers', -1,
"""Number of worker processed to be created, -1 will spawn as much as physical cores.""")
def __init__(self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
# Initialize base manager stuff
self._num_workers = 0
self.results = multiprocessing.JoinableQueue()
[docs] def init(self, identity, address):
"""
Initialize the manager
"""
self.identity = identity
self.host = address[0]
self.port = address[1]
self._num_workers = self.num_workers
if self._num_workers <= 0:
self._num_workers = multiprocessing.cpu_count()
# We now prepare our queues, both the joinable and the results
# queues. Then we just create a process for each worker
self.tasks = multiprocessing.JoinableQueue()
self.processes = [TaskProcess(self.results, i, self.tasks, self.identity, self.host, self.port) for i in range(self._num_workers)]
#self.processes = [TaskProcess(self.results, i) for i in range(self._num_workers)]
context = zmq.Context()
self.ventilator_send = context.socket(zmq.PUSH)
self.ventilator_send.bind("tcp://127.0.0.1:%d" % WORKER_PORT)
[docs] def get_num_workers(self):
"""
Return the number of workers we use for our processing
"""
return self._num_workers
[docs] def start(self):
"""
Start our worker processes
"""
for worker in self.processes:
worker.daemon = True
worker.start()
[docs] def stop(self):
"""
Stop our worker processes
"""
for i in xrange(self._num_workers):
#send_to_zmq_zipped(self.ventilator_send, None)
print("Adding task")
self.tasks.put(None)
# Poison for result listener
self.results.put(None)
[docs] def update_pool(self, _num_workers=-1):
"""
Set the number of workers the task manager should use
"""
self.stop()
self.init(_num_workers)
self.start()
[docs] def push_task(self, task):
"""
Push a task that should be completed by the workers
"""
try:
#send_to_zmq_zipped(self.ventilator_send, task)
self.tasks.put(task)
except:
traceback.print_exc()
return True
[docs] def wait_for_all(self):
"""
Wait until all tasks has been finished
"""
pass
[docs] def get_results_queue(self):
"""
Return a refernce to the result queue
"""
return self.results
[docs] def task_finished(self, task, result, error):
"""
Called once a task has been performed
"""
task.finished(result, error)
class GenericTaskScheduleStrategy(Component):
implements(ITaskScheduleStrategy)
"""
Implements a schedule strategy to select the next valid slave that should process a given task.
We update the workers and rank them so that we give a slave with idle workers a better
rating than a worker with pending work.
- If rank > 0 => Slave has idle processes
- If rank == 0 => Slave has currently the same number of tasks then processes
- If rank < 0 => Slave has currently more tasks thank workers
"""
def setup(self, scheduler, master):
self.scheduler = scheduler
self.master = master
def rate(self, slave):
"""
Rate a slave without any lock. Less if better.
"""
# Worst rating if we have a slave without workers!
if slave.workers < 1:
return sys.float_info.max
# Basic rating is the task/worker ratio
rating = max(0, slave.tasks / (slave.workers * 1.0))
# TODO: Add task finished per second ratio
return rating
def get_next_slave(self):
"""
Get the slave that should process the next task, get the one with better rating
"""
if self.master.node_registry:
# Find the best score
best_rating = sys.float_info.max
best_node = None
for node in self.master.node_registry:
if self.master.node_registry[node] and self.master.node_registry[node].state == NodeState.active \
and self.master.node_registry[node].type == NodeType.slave \
and self.master.node_registry[node].rating < best_rating:
best_rating = self.master.node_registry[node].rating
best_node = node
# Get all nodes with the same or similar score (some nice epsilon?)
#Nodes ...
#for node in self.master.node_registry:
# if node and node.rating == best_rating:
# best_rating = node.rating
# Get a random slave form the first 10% of slaves. This will give us a bit
# of randomness in case sending the tasks failes
# For now we just use the best node
if best_node:
return best_node
return None
class schedule_thread(InterruptibleThread):
"""
The schedule thread is responsible to pickup tasks from the global task queue and to send then to a slave.
"""
def __init__(self, log, task_queue, callback):
InterruptibleThread.__init__(self, log)
self.task_queue = task_queue
self.callback = callback
def run(self):
while True:
task = self.task_queue.get()
if task is None:
# A None task is used to shut us down
self.task_queue.task_done()
break
self.task_queue.task_done()
# Send task to one of our slaves
self.callback(task)
self.log.info("schedule_thread exiting")
def stop(self):
try:
self.task_queue.put(None)
self.join()
except:
pass
self.log.info("schedule_thread stopped")
class GenericTaskScheduler(Component):
implements(ITaskScheduler)
"""
Interface used by our distributed task scheduler. A scheduler receives an implemented system
that will be executed on the distributed system through pickleing Python instances.
"""
strategy = ExtensionPointItem('generictaskscheduler', 'strategy', ITaskScheduleStrategy, 'GenericTaskScheduleStrategy',
"""Task schedulers used to schedule execution""")
def __init__(self):
Component.__init__(self)
self.stats = Stats.getInstance()
# Map that maps tasks and slaves to be able to resend the tasks if the slave was deleted from the system
self.task_map = {}
def setup(self, master):
self.master = master
self.lock = self.master.registry_lock
# This is the global systems task queue. Every time we add tasks we will add them to this queue.
# The global queue is where the current strategy will pickup tasks and decide which ones shall
# be sent over a slave to be processed (this is getting done from a thread that waits for the queue)
# If a new tasks gets added or a task gets completed we will notify the strategy which then decides to
# pickup and process a new task.
self.tasks = multiprocessing.JoinableQueue()
# Schedule thread which will pickup processabel task and send them to a good slave
self.schedule_thread = schedule_thread(self.log, self.tasks, self.handle_task)
self.schedule_thread.start()
# Do not pass the lock to the strategy, we have to ensure we handle locks for it
self.strategy.setup(self, self.master)
def stop(self):
self.schedule_thread.stop()
def _valid_id_no_lock(self, slave_id):
"""
Check if slave id is pointing to a valid slave without any lock
"""
return slave_id in self.master.node_registry and self._valid_slave_no_lock(self.master.node_registry[slave_id])
def _valid_slave_no_lock(self, slave):
"""
Check if a slave is valid without using any locks
"""
return slave and slave.type == NodeType.slave and slave.state == NodeState.active
def rate_slaves(self):
"""
Update slaves
"""
with self.lock.writelock:
start = time.time()
for slave_id in self.master.node_registry:
if self._valid_slave_no_lock(self.master.node_registry[slave_id]):
self.master.node_registry[slave_id].rating = self.strategy.rate(self.master.node_registry[slave_id])
ellapsed = time.time() - start
self.stats.add_avg("GenericTaskScheduleStrategy-rate-time",ellapsed)
def start_system(self, task_system):
"""
Start an incomming task system
"""
self.push_tasks(task_system.generate_tasks(self.master))
def push_tasks(self, tasks):
"""
Push all tasks on the global task queue
"""
for task in tasks:
self.push_task(task)
def push_task(self, task):
"""
Put a task on the global task queue
"""
# Do not poison ourselfs!
if task:
self.tasks.put(task)
def handle_task(self, task):
"""
Send a task to a slave or in case it failed queue the task back
"""
with self.lock.readlock:
reschedule = True
try:
slave_id = self.strategy.get_next_slave()
if slave_id:
#TODO: Pickle task and send to slave
task.slave_id = slave_id
start = time.time()
self.master.node_registry[task.slave_id].tcp_proxy.push_task(task)
#print("Sending task: {} in {}".format(task.name, time.time() - start))
reschedule = False
# Add task id to this slave so we could resend the task
self._tasked_pushed(task.slave_id)
except Exception as e:
#self.log.error("Failed to send task to slave: %s. Queueing task again!" % str(e))
self.stats.add_avg("GenericTaskScheduler-task-send-failed")
# Make sure we try it again!
if reschedule:
self.push_task(task)
def _tasked_pushed(self, slave_id):
"""
A slave has aquired a new task, update its rank
"""
#with self.lock.readlock:
if self._valid_id_no_lock(slave_id):
self.master.node_registry[slave_id].tasks += 1
self.master.node_registry[slave_id].rating = self.strategy.rate(self.master.node_registry[slave_id])
#print("Push: {}".format(self.master.node_registry[slave_id].tasks))
def task_finished(self, task, result, error):
"""
A slave has finished a new task, update its rank
"""
task.finished(result, error)
# Do not aquiere any write lock if the id is not valid!
#with self.lock.readlock:
if self._valid_id_no_lock(task.slave_id):
self.master.node_registry[task.slave_id].tasks -= 1
self.master.node_registry[task.slave_id].rating = self.strategy.rate(self.master.node_registry[task.slave_id])
#print("Pop: {}".format(self.master.node_registry[task.slave_id].tasks))
#self.strategy.task_finished(result['task_id'] check results!)
class ZMQTaskManager(Component, threading.Thread):
implements(ITaskManager)
"""
Simple task manager used in simple single job applications
"""
num_workers = IntItem('ZMQTaskManager', 'num_workers', -1,
"""Number of worker processed to be created, -1 will spawn as much as physical cores.""")
master_backend_port = HostItem('ZMQTaskManager', 'master_backend_port', 'localhost:5001',
"""Masters backend port where we will request tasks.""")
def __init__(self):
threading.Thread.__init__(self)
Component.__init__(self)
# Some thread related stuff
self.daemon = True
self.kill_switch = False
# Create contect and socket
self.context = zmq.Context()
# Initialize base manager stuff
self._num_workers = 0
self.results = multiprocessing.JoinableQueue()
def init(self, identity, address):
"""
Initialize the manager
"""
self.identity = identity
self.host = address[0]
self.port = address[1]
self._num_workers = self.num_workers
if self._num_workers <= 0:
self._num_workers = multiprocessing.cpu_count()
# We now prepare our queues, both the joinable and the results
# queues. Then we just create a process for each worker
self.tasks = multiprocessing.JoinableQueue()
self.processes = [TaskProcess(self.results, i, self.tasks, self.identity, self.host, self.port) for i in range(self._num_workers)]
#self.processes = [TaskProcess(self.results, i) for i in range(self._num_workers)]
context = zmq.Context()
self.ventilator_send = context.socket(zmq.PUSH)
self.ventilator_send.bind("tcp://127.0.0.1:%d" % WORKER_PORT)
def get_num_workers(self):
"""
Return the number of workers we use for our processing
"""
return self._num_workers
def start(self):
"""
Start our worker processes
"""
threading.Thread.start(self)
for worker in self.processes:
worker.daemon = True
worker.start()
def stop(self):
"""
Stop our worker processes
"""
self.log.info("Shutting down ZMQTaskManager")
for i in xrange(self._num_workers):
#send_to_zmq_zipped(self.ventilator_send, None)
self.tasks.put(None)
# Poison for result listener
self.results.put(None)
# Kill our own thread
self.kill_switch = True
self.context.term()
self.join(5000)
self.log.info("ZMQTaskManager shutdown finished")
def run(self):
self.log.info("ZMQTaskManager started")
# Create and connect to our scheduler socket
self.socket = self.context.socket(zmq.PULL)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.set_hwm(0)
self.socket.connect('tcp://{host}:{port}'.format(host=self.master_backend_port[0], port=self.master_backend_port[1]))
# Start receiving messages
while not self.kill_switch:
try:
next_task = receive_from_zmq_zipped(self.socket)
self.push_task(next_task)
except zmq.ContextTerminated:
break
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
pass # no message was ready
else:
break
except:
traceback.print_exc()
self.socket.close()
self.log.info("ZMQTaskManager stopped")
def update_pool(self, _num_workers=-1):
"""
Set the number of workers the task manager should use
"""
self.stop()
self.init(_num_workers)
self.start()
def push_task(self, task):
"""
Push a task that should be completed by the workers
"""
try:
#send_to_zmq_zipped(self.ventilator_send, task)
self.tasks.put(task)
except:
traceback.print_exc()
return True
def wait_for_all(self):
"""
Wait until all tasks has been finished
"""
pass
def get_results_queue(self):
"""
Return a refernce to the result queue
"""
return self.results
def task_finished(self, task, result, error):
"""
Called once a task has been performed
"""
task.finished(result, error)
class ZMQTaskScheduler(Component, threading.Thread):
implements(ITaskScheduler)
"""
Different task scheduler implementation using ZMQ push/pull sockets. Uses a simple round-robin
mechanism to handle multiple slaves.
"""
frontend_port = IntItem('ZMQTaskScheduler', 'frontend_port', 5000,
"""Frontend port used to send tasks to the scheduler""")
backend_port = IntItem('ZMQTaskScheduler', 'backend_port', 5001,
"""Backend port used to send tasks to the scheduler. Slaves will receive tasks on it.""")
def __init__(self):
threading.Thread.__init__ (self)
Component.__init__(self)
self.stats = Stats.getInstance()
# Some thread related stuff
self.daemon = True
self.kill_switch = False
# The socket framework
self.context = zmq.Context()
self.frontend = self.context.socket(zmq.PULL)
self.frontend.bind('tcp://*:{port}'.format(port=self.frontend_port))
self.frontend.setsockopt(zmq.LINGER, 0)
self.frontend.set_hwm(0)
self.backend = self.context.socket(zmq.PUSH)
self.backend.bind('tcp://*:{port}'.format(port=self.backend_port))
self.backend.setsockopt(zmq.LINGER, 0)
self.backend.set_hwm(0)
# The poller is used to poll for incomming messages for both
# the frontend (internet) and the backend (scheduling)
self.poll = zmq.Poller()
self.poll.register(self.frontend, zmq.POLLIN)
# Connected socket locally to frontend to send tasks, this socket
# provides a lock to be able to be thread-safe
self.frontend_push = self.context.socket(zmq.PUSH)
self.frontend_push.connect('tcp://localhost:{port}'.format(port=self.frontend_port))
self.frontend_push.setsockopt(zmq.LINGER, 0)
self.frontend_push.set_hwm(0)
# Our lock used to protect the frontend_push socket
self.lock = threading.Lock()
def setup(self, master):
self.master = master
self.start()
def run(self):
self.log.info("ZMQTaskScheduler started")
# Start receiving messages
while not self.kill_switch:
try:
sockets = dict(self.poll.poll(1000))
if self.frontend in sockets:
msg = self.frontend.recv(flags=zmq.NOBLOCK)
#tprint('Server received message from %s' % (ident))
self.backend.send(msg, flags=zmq.NOBLOCK)
except zmq.Again:
# Timeouy just fired, no problem!
pass
except KeyboardInterrupt:
break
except zmq.ContextTerminated:
break
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
pass # no message was ready
else:
break
except:
traceback.print_exc()
# Not really good to just pass but saver for now!
pass
self.frontend.close()
self.backend.close()
with self.lock:
self.frontend_push.close()
self.context.term()
self.log.info("ZMQTaskScheduler stopped")
def stop(self):
self.log.info("Shutting down ZMQTaskScheduler")
self.kill_switch = True
self.join(5000)
self.log.info("ZMQTaskScheduler shutdown finished")
def start_system(self, task_system):
"""
Start an incomming task system
"""
self.push_tasks(task_system.generate_tasks(self.master))
def _push_task(self, task):
"""
No lock variant of push task method
"""
send_to_zmq_zipped(self.frontend_push, task)
def push_tasks(self, tasks):
"""
Push all tasks on the global task queue
"""
with self.lock:
# DO NOT USE push_task to queue tasks! It would be a deadlock!
for task in tasks:
self._push_task(task)
#self.tasks.put(task)
def push_task(self, task):
"""
Put a task on the global task queue
"""
with self.lock:
# Do not poison ourselfs!
if task:
self._push_task(task)
#self.tasks.put(task)
def rate_slaves(self):
"""
Update slaves
"""
pass
def _tasked_pushed(self, slave_id):
"""
A slave has aquired a new task, update its rank
"""
pass
def task_finished(self, task, result, error):
"""
A slave has finished a new task, update its rank
"""
task.finished(result, error)
class AdvancedTaskManager(Component):
implements(ITaskManager)
"""
Advanced task manager which
"""
#
# Our async task update logic. This gets executed from our step
# controller from the nodes main loop.
#
# The update loop is composed by:
# - Generate task list
# - Execute task for current stat using current delta time and step count
# - Collect results and syncronize between systems (we still need to define what
# a system is, just a component that implements the system ExtensionPoint)
#
def update(self):
"""
Update call from the frameworks main loop.
"""
pass
def _pre_execute(self):
"""
Called before a step is exeuted
"""
pass
def _post_execute(self):
"""
Called after a step has been performed
"""
pass
def _execute(self):
"""
Execute a given step of the framework execution
"""
pass