# -*- coding: utf-8 -*-
"""
Module containing our base node enteties
"""
from concurrent.core.config.config import IntItem, ExtensionPointItem, ConfigItem, FloatItem, BoolItem
from concurrent.core.transport.simplejsonrpc import SimpleJSONRPCService, jsonremote
from concurrent.core.transport.gzipper import Gzipper
from concurrent.core.transport.tcpserver import TCPClient, TCPServerProxyZMQ, TCPClientProxyZMQ
from concurrent.core.transport.tcpsocket import TCPSocket, send_to_zmq_multi
from concurrent.core.async.api import ITaskManager
from concurrent.core.async.threads import InterruptibleThread, ReadWriteLock, RWLockCache
from concurrent.core.application.api import APP_RET_CODE_SUCCESS, IPickler, NodeType, NodeState
from concurrent.core.util.stats import Stats
from concurrent.core.util.cryptohelper import CryptoHelper
import concurrent.core.transport.pyjsonrpc as pyjsonrpc
from bunch import Bunch
import uuid
import sys
import web
import time
import traceback
__all__ = ['BaseNode', 'ComputeNode', 'NodeType', 'NodeState', 'FailedToRegisterWithMaster']
[docs]class FailedToRegisterWithMaster(Exception):
"""
Exception raised when a node failed to register itself with the master node
onto the compute channel.
"""
class NodeApp(web.application):
"""
Web app for our node
"""
def run(self, port, *middleware):
func = self.wsgifunc(*middleware)
return web.httpserver.runsimple(func, ('0.0.0.0', port))
class NodeProxy(object):
"""
Make proxy calls async using a ThreadPool: http://www.chrisarndt.de/projects/threadpool/
"""
def __init__(self, ProxyObject, log, ok_cb, ko_cb):
self._obj=ProxyObject
self.log = log
self.ok_cb = ok_cb
self.ko_cb = ko_cb
self.stats = Stats.getInstance()
class _Method(object):
def __init__(self, owner, proxy_obj, method, log, ok_cb, ko_cb):
self.owner = owner
self._obj=proxy_obj
self.method = method
self.log = log
self.ok_cb = ok_cb
self.ko_cb = ko_cb
def __call__(self, *args, **kwargs):
try:
start_time = time.time()
result = self._obj.call(self.method, *args, **kwargs)
self.owner.stats.add_avg('NodeProxy', time.time() - start_time)
self.ok_cb(self._obj, self.method, result)
return result
except Exception as e:
self.owner.stats.add_avg('NodeProxy', time.time() - start_time)
self.ko_cb(self._obj, self.method, e)
raise e
def __call__(self, method, *args, **kwargs):
try:
start_time = time.time()
result = self._obj.call(self.method, *args, **kwargs)
self.owner.stats.add_avg('NodeProxy', time.time() - start_time)
self.ok_cb(self._obj, self.method, result)
return result
except Exception as e:
self.owner.stats.add_avg('NodeProxy', time.time() - start_time)
self.ko_cb(self._obj, self.method, e)
raise e
def __getattr__(self, method):
return self._Method(self, self._obj, method = method, log = self.log, ko_cb = self.ko_cb, ok_cb = self.ok_cb)
def dump_stats(self):
self.log.debug(self.stats.dump('NodeProxy'))
class TCPProxy(object):
"""
TCP socket proxy using our JSON RPC protocol. The TCP proxy does not handle
the answer from the given call, the answers are beeing received within the sockets
own answer thread.
"""
def __init__(self, ProxyObject, log):
self._obj=ProxyObject
self.log = log
self.stats = Stats.getInstance()
class _Method(object):
def __init__(self, owner, proxy_obj, method, log):
self.owner = owner
self._obj=proxy_obj
self.method = method
self.log = log
def __call__(self, *args, **kwargs):
# Connect and close are very special
if self.method == "close":
self._obj.close()
elif self.method == "connect":
self._obj.connect()
else:
try:
start_time = time.time()
self._obj.send_to(self.method, *args, **kwargs)
except Exception as e:
raise e
finally:
self.owner.stats.add_avg('TCPProxy', time.time() - start_time)
def __call__(self, method, *args, **kwargs):
# Connect and close are very special
if method == "close":
self._obj.close()
elif method == "connect":
self._obj.connect()
else:
try:
start_time = time.time()
self._obj.send_to(self.method, *args, **kwargs)
except Exception as e:
raise e
finally:
self.owner.stats.add_avg('TCPProxy', time.time() - start_time)
def __getattr__(self, method):
# Connect and close are very special
return self._Method(self, self._obj, method = method, log = self.log)
def dump_stats(self):
self.log.debug(self.stats.dump('TCPProxy'))
class TCPProxyZMQ(object):
"""
ZMQ client TCP socket proxy
"""
def __init__(self, socket, identity, log):
self.socket=socket
self.identity=identity
self.log = log
self.stats = Stats.getInstance()
class _Method(object):
def __init__(self, owner, method, log):
self.owner = owner
self.method = method
self.log = log
def __call__(self, *args, **kwargs):
if self.method != "close" and self.method != "connect":
try:
start_time = time.time()
send_to_zmq_multi(self.owner.socket, self.owner.identity, self.method, *args, **kwargs)
except Exception as e:
raise e
finally:
self.owner.stats.add_avg('TCPProxyZMQ', time.time() - start_time)
def __call__(self, method, *args, **kwargs):
if method != "close" and method != "connect":
try:
start_time = time.time()
send_to_zmq_multi(self.socket, self.identity, self.method, *args, **kwargs)
except Exception as e:
raise e
finally:
self.owner.stats.add_avg('TCPProxyZMQ', time.time() - start_time)
def __getattr__(self, method):
# Connect and close are very special
return self._Method(self, method = method, log = self.log)
def dump_stats(self):
self.log.debug(self.stats.dump('TCPProxy'))
class api_thread(InterruptibleThread):
"""
Thread holding our JSON API server
"""
def __init__(self, log, urls, port, use_gzip):
InterruptibleThread.__init__(self, log)
self.urls = urls
self.port = port
self.app = NodeApp(self.urls, globals())
#self.app.internalerror = web.debugerror
web.config.debug = False
self.use_gzip = use_gzip
self.gzipper = None
def run(self):
# Now launch our web app
if self.use_gzip:
self.app.run(self.port, Gzipper)
else:
self.app.run(self.port)
def stop(self):
try:
self.app.stop()
self.join()
except:
pass
self.log.info("API Server Stopped")
# Not a good idea though! stopping the server is enough!
#self.stop_and_wait()
class GlobalHook(Bunch):
"""
This is our global hook, used to save a reference to our global node
"""
global_hook = None
class index_get():
"""
API index URL stub
"""
def GET(self):
global global_hook
return global_hook.node.index()
class ping_get():
"""
API index URL stub
"""
def GET(self):
global global_hook
return global_hook.node.ping()
class status_get():
"""
API index URL stub
"""
def GET(self):
global global_hook
return global_hook.node.status()
class stats_get():
"""
API index URL stub
"""
def GET(self):
global global_hook
return global_hook.node.get_stats()
class APIHandlerV1():
def POST(self):
global global_hook
if 'CONTENT_LENGTH' in web.ctx.env:
global_hook.node.stats.add_avg('api1-content-length',int(web.ctx.env['CONTENT_LENGTH']))
return global_hook.node.api_service_v1(web.webapi.data())
[docs]class BaseNode(object):
"""
Base node, all nodes will be atleast of this type.
Responsible for hosting and exposing a simple API
apart from listening on a TCP port for socket interactions.
"""
port = IntItem('node', 'port', 8080,
"""Port of the API interface with this node""")
use_gzip = BoolItem('node', 'use_gzip', True,
"""Check if we should gzip all interactions (recommended)""")
pickler = ExtensionPointItem('Node', 'pickler', IPickler, 'Pickler',
"""Pickler class used by the whole framework""")
proxy_api = IntItem('node', 'proxy_api', 1,
"""API version used for any client JSON RPC calls""")
proxy_username = ConfigItem('node', 'proxy_username', '',
"""Username used when performing API client calls""")
proxy_password = ConfigItem('node', 'proxy_password', '',
"""Password used when performing API client calls""")
heartbeat_timer = FloatItem('node', 'heartbeat_timer', 5.0,
"""Timer used to send periodically heartbeats to the master""")
stats_dump_timer = FloatItem('node', 'stats_dump_timer', 30.0,
"""Timer used to dump stats into the log. -1 will never dump stats.""")
secret = ConfigItem('node', 'crypot_secret', 'JhTv535Vg385V',
"""Default salt used on decrypting encrypting a pickle""")
# salt size in bytes
salt_size = IntItem('node', 'crypot_salt_size', 16,
"""Size of the salt used in the encryption process""")
# number of iterations in the key generation
num_iterations = IntItem('node', 'crypot_num_iterations', 20,
"""Number of iterations used in the key generation""")
# the size multiple required for AES
aes_padding = IntItem('node', 'crypot_aes_padding', 16,
"""Padding used for AES encryption""")
urls = (
# Get and basic API handling (not versioned!)
'/', 'index_get'
, '/ping/', 'ping_get'
, '/ping', 'pint_get'
, '/status/', 'status_get'
, '/status', 'status_get'
, '/stats/', 'stats_get'
, '/stats', 'stats_get'
# Post API handling of version 1
, '/api/1/', 'APIHandlerV1'
, '/api/1', 'APIHandlerV1'
)
[docs] def app_init(self):
"""
Initialize application just before running it
"""
self.lock_cache = RWLockCache()
[docs] def app_main(self):
"""
Launch a concurrent application
"""
# Generate rest API
self.generate_api()
# Now run our API listener
self.log.debug("Hosting application on port %d" % (self.port))
# Get a ref to our stats helper
self.stats = Stats.getInstance()
# Create cryto helper used for network communciation
self.crypto_helper = CryptoHelper(self.salt_size, self.num_iterations, self.aes_padding)
# Make sure the URL proxy knows us
global global_hook
global_hook = GlobalHook({'node':self})
#api should only be there for the master node and used for node registration and heartbeats. Each node will have a socket
#while slave nodes will have a local server too. There servers are no web servers because they are too expensive!
#refactor server thingy tomorrow and add client which will be connected with the server through a normal socket!
#The master server will act as only that, a controller and will distribute work using a better performing mechanism: UDP?
#Use asycn calls for heartbeat for example
#Create the server the same way the guys from PP do! (See ppserver) Try using a multithreaded pool to handle connections instead of threads!
self.api_thread = api_thread(self.log, self.urls, self.port, self.use_gzip)
self.api_thread.daemon = True
self.api_thread.start()
self.heartbeat_threshold = self.heartbeat_timer
self.current_time = 0
self.last_time = 0
self.last_delta_time = 0
self.stats_dump_threshold = self.stats_dump_timer
# Bool flag used to control the main loop
self.kill_received = False
# Give us some time until its up
time.sleep(0.5)
return APP_RET_CODE_SUCCESS
[docs] def stop_api_thread(self):
self.api_thread.stop()
[docs] def main_loop(self):
# Register with master before anything
if self.has_master():
self.register_with_master()
self.last_time = time.time()
while not self.kill_received:
try:
# Calculate delta time for this frame
self.current_time = time.time()
delta_time = self.current_time - self.last_time
self.on_update(delta_time)
# Safe last time
self.last_time = self.current_time
self.last_delta_time = delta_time
except KeyboardInterrupt:
try:
if self.has_master():
self.unregister_from_master()
except Exception as e:
traceback.print_exc()
self.log.info("Exiting main loop")
self.kill_received = True
except Exception as e:
traceback.print_exc()
self.log.error("Mainloop exception: %s" % (e))
self.log.info("Main loop exited!")
[docs] def shutdown_main_loop(self):
self.kill_received = True
[docs] def on_update(self, delta_time):
# Only dump is requested
if self.stats_dump_timer > 0:
self.stats_dump_threshold -= delta_time
if self.stats_dump_threshold < 0:
self.stats.dump_stats(self.log)
self.stats_dump_threshold = self.stats_dump_timer
[docs] def generate_api(self):
# API service handler for version 1 (only version for now)
self.api_service_v1 = SimpleJSONRPCService(api_version=1)
@jsonremote(self.api_service_v1)
def ping(request):
return "pong"
@jsonremote(self.api_service_v1)
def status(request):
return self.status()
@jsonremote(self.api_service_v1)
def api(request):
return self.api_service_v1.api()
[docs] def ping(self):
return "pong"
[docs] def index(self):
return "OK"
[docs] def status(self):
status = {
'node':self.__class__.__name__
, 'systeminfo': self.compmgr.systeminfo
}
return status
[docs] def get_stats(self):
return self.stats.dump_all()
[docs] def create_node_proxy(self, url):
"""
Create a new json proxy instance used by the node when acting as a client role
"""
return NodeProxy(pyjsonrpc.HttpClient( url = ("http://%s/api/%d") % (url,self.proxy_api),
username = self.proxy_username,
password = self.proxy_password ), self.log,
self.rpc_call_success, self.rpc_call_failed)
[docs] def create_tcp_proxy(self, host, port):
"""
Create a JSON TCP socket proxy instance to a server
"""
#tcp_client = TCPClient(self.log, host, port, self)
#return TCPProxy(tcp_client, self.log), tcp_client
tcp_client = TCPServerProxyZMQ(self.node_id_str, host, port, self.log)
return TCPProxy(tcp_client, self.log), tcp_client
[docs] def create_tcp_client_proxy(self, sock, request):
"""
Create a JSON TCP socket proxy instance to a client
"""
return TCPProxyZMQ(sock, request, self.log)
[docs] def create_tcp_client_proxy_zmq(self, context, identity):
"""
Create a JSON TCP socket proxy instance to a client
"""
return TCPProxy(TCPClientProxyZMQ(context, identity, self.log), self.log)
# TODO: Make every node steam large amount of data over the normal socket: http://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
# Control channel is over the API channel and real-time interactions over the TCP socket (see transport module)
class result_thread(InterruptibleThread):
"""
Thread waiting for results from the workers
"""
def __init__(self, log, result_queue, callback):
InterruptibleThread.__init__(self, log)
self.callback = callback
self.result_queue = result_queue
def run(self):
while True:
result = self.result_queue.get()
if result is None:
# A None task is used to shut us down
self.result_queue.task_done()
break
self.result_queue.task_done()
# If there was an error to send the result back queue it again
if not self.callback(result.task, result.result, result.error):
self.result_queue.put(result)
self.log.info("result_thread exiting")
def stop(self):
try:
self.result_queue.put(None)
self.join()
except:
pass
self.log.info("result_thread stopped")
class Node(BaseNode):
"""
A simple node is a noce that has a master node. It handles the masters proxy and keeps it alive using
heartbeats
"""
def app_init(self):
"""
Initialize application just before running it
"""
super(Node, self).app_init()
# We create our own node_id, this will be unique everywhere!
self.node_id = uuid.uuid1()
self.node_id_str = str(self.node_id)
def app_main(self):
"""
Launch a concurrent application
"""
super(Node, self).app_main()
self.setup_master()
def on_update(self, delta_time):
super(Node, self).on_update(delta_time)
# Hanlde heartbeat
self.heartbeat_threshold -= delta_time
if self.has_master() and self.heartbeat_threshold < 0:
self.send_heartbeat()
self.heartbeat_threshold = self.heartbeat_timer
def generate_api(self):
super(Node, self).generate_api()
@jsonremote(self.api_service_v1)
def master_disconnected(request):
return self.master_disconnected(True)
def master_disconnected(self, gracefully):
"""
Called when a master is disconnected (gracefully) or we had no response from the master itself (ungracefull)
"""
raise NotImplementedError("Node has not implemented master_disconnected!")
def get_master_url(self):
"""
Get the URL where our master node is hosted
"""
raise NotImplementedError("Node has not implemented get_master_url!")
def get_master_address(self):
"""
Get the adress and port in (host,port) fashion
"""
raise NotImplementedError("Node has not implemented get_master_address!")
def setup_master(self):
"""
Nodes that state to have a master should now create the master proxy
"""
self.master_node = self.create_node_proxy(self.get_master_url())
def register_with_master(self):
"""
The node will register itself with the expected master node
"""
raise NotImplementedError("Node has not implemented register_node!")
def unregister_from_master(self):
"""
The node will unregister itself with the expected master node
"""
raise NotImplementedError("Node has not implemented unregister_node!")
def send_heartbeat(self):
"""
Send heartbeat to master in case we have one
"""
raise NotImplementedError("Node has not implemented send_heartbeat!")
def rpc_call_failed(self, proxy, method, reason):
"""
Called when an RPC call failed for an unexpected reason
"""
raise NotImplementedError("Node has not implemented rpc_call_failed!")
def rpc_call_success(self, proxy, method, result):
"""
Called when an RPC call succeded
"""
raise NotImplementedError("Node has not implemented rpc_call_success!")
return result
[docs]class ComputeNode(Node):
"""
A compute node is a base node that does computation. It features a set of worker
processes that except jobs from, usually a MasterNode.
"""
task_manager = ExtensionPointItem('computenode', 'task_manager', ITaskManager, 'GenericTaskManager',
"""Task manager used by this compute node""")
[docs] def setup_compute_node(self):
"""
Launch the compute service from this node
"""
self.log.info("Initializing ComputeNode")
self.task_manager.init(self.node_id_str, self.get_master_address());
self.task_manager.start();
# Start results collector thread
self.collector_thread = None
self.collector_thread = result_thread(self.log, self.task_manager.get_results_queue(), self.task_finished)
self.collector_thread.daemon = True
self.collector_thread.start()
[docs] def stop_compute_node(self):
self.task_manager.stop()
if self.collector_thread:
self.collector_thread.stop()
[docs] def task_finished(self, task, result, error):
"""
Called when a task has finished its computation, the result object contains the task,
the result or an error and additional information
"""
raise NotImplemented("Node must implement the on_result callback function")
return False
[docs] def get_num_workers(self):
"""
Return the number of workers the compute node has spawned
"""
return self.task_manager.get_num_workers()
[docs] def generate_api(self):
"""
Create all rpc methods the node requires
"""
super(ComputeNode, self).generate_api()
@jsonremote(self.api_service_v1, doc='Push a task onto the computation node')
def push_task(request, task):
self.stats.add_avg('push_task')
start = time.time()
newTask = self.pickler.unpickle_s(task)
ellapsed = time.time() - start
self.stats.add_avg('push_task_unpickle_time',ellapsed)
return self.push_task(newTask)
[docs] def push_task(self, task):
"""
Push a task onto the computation framework
"""
return self.task_manager.push_task(task)