Source code for concurrent.core.transport.tcpserver

# -*- coding: utf-8 -*-
"""
Implementation of our socket server
"""

from concurrent.core.transport.tcpsocket import send_to, receive_from, send_to_zmq_zipped, send_to_zmq, send_to_zmq_multi, pickle_object, unpickle_message, VERSION, NoDataException, TCPSocket, TCPSocketZMQ
from concurrent.core.transport.pyjsonrpc.rpcerror import JsonRpcError
from concurrent.core.async.threads import InterruptibleThread
from concurrent.core.util.utils import tprint

import SocketServer
import threading
import traceback
import time
import socket
import errno
import zmq

__all__ = ['ThreadedSocketServer', 'tcpremote', 'TCPHandler', 'TCPServer', 'TCPClient']

class NoResponseRequired(Exception):
    """
    Exception raised when the executed method does not require a response. Used for Fire and 
    forget methods.
    """
 
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    def __init__(self, request, client_address, server):
        SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
        
    def setup(self):
        self.shutdown = False
        self.initial_connection = time.time()        
        self.server.client_connected(self.request, self.client_address, self)
        self.node_id = None
        self.node_type = None
        
    def handle(self):
        while not self.shutdown:
            # Decode data from socket
            try:
                send_to(self.request, *self.server.handle(self, self.request, receive_from(self.request)))
            except NoDataException:
                # No data means that there where nothign to read for and so the socket is dead
                self.shutdown = True
            except socket.error as e:
                if e.errno == errno.EINTR:
                    continue
                break
            except NoResponseRequired:
                # Method does not require a response to the socket, this is actually fine ^^
                pass
            except KeyboardInterrupt:
                break
            except:
                traceback.print_exc()
                # Not really good to just pass but saver for now!
                pass
        try:
            # Close socket just in case we let the resource open
            if self.request:
                self.request.close()
        except:
            # Not really an issue at this point
            pass
    
    def close(self):
        self.shutdown = True
    
    def finish(self):
        self.server.client_disconnected(self.request, self.client_address, self)
    
    def set_node_id(self, node_id, node_type):
        """
        Set the node_id used by this handler to link it to a registered node
        """
        self.node_id = node_id
        self.node_type = node_type

[docs]class TCPHandler(object): """ Very simple TCP protocol handler that translates incomming request to function calls """ def __init__(self): object.__init__(self) self.method_map = {}
[docs] def add_method(self, name, method): self.method_map[name] = method
[docs] def handle_rpc(self, handler, request, data): try: # TODO: Error handling, we will have the 'e' field within our data dict # TODO: Handle return of a simple ping-pong call to stop calling the client. (NoResponseRequired) v, method, params = data["v"], data["m"], [handler, request] + list(data["p"]) #print(method) if v != VERSION: return "{}_failed".format(method), {"c": -32600, "m": "Invalid Request"}, if method in self.method_map: try: result = self.method_map[method](*params) # No ping-pong for response calls if method.endswith('_failed') or method.endswith('_response'): raise NoResponseRequired() except JsonRpcError as e: traceback.print_exc() return "{}_failed".format(method), {"c": e.code, "m": e.message}, except TypeError: traceback.print_exc() return "{}_failed".format(method), {"c": -32602, "m": "Invalid params"}, return "{}_response".format(method), result else: # If the methods was not found and it was a fail or a response message just stop here if method.endswith('_failed') or method.endswith('_response'): raise NoResponseRequired() return "{}_failed".format(method), {"c": -32601, "m": "Method not found"}, except KeyError: traceback.print_exc() if method: return "{}_failed".format(method), {"c": -32700, "m": "Parse error"}, raise NoResponseRequired() except TypeError: traceback.print_exc() if method: return "{}_failed".format(method), {"c": -32600, "m": "Invalid Request"}, raise NoResponseRequired() except NoResponseRequired as e: # Fire up raise e except NotImplementedError as e: traceback.print_exc() raise e except Exception as e: traceback.print_exc() if method: return "{}_failed".format(method), {"c": -32603, "m": "Internal error", "e": e, "t": traceback.format_exc()}, raise NoResponseRequired()
[docs] def handle(self, handler, request, data): return self.handle_rpc(handler, request, data)
[docs]class TCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer, TCPHandler): """ Our threaded socket implementation is a JSON-RPC implementation using a SocketServer. We use this technique to be able to achieve high-performance in connectivity and sync between all nodes while being felxible and simple within our data interaction. """ daemon_threads = True allow_reuse_address = True def __init__(self, host, port, master): SocketServer.TCPServer.__init__(self, (host, port), ThreadedTCPRequestHandler) TCPHandler.__init__(self) self.master = master
[docs] def client_connected(self, request, client_address, handler): print('{}:{} connected'.format(*client_address))
[docs] def client_disconnected(self, request, client_address, handler): print('{}:{} disconnected'.format(*client_address))
class client_thread(InterruptibleThread): """ The client thread of a socket reads in a nice loop any response that comes from the server connection """ def __init__(self, tcp_client, log): InterruptibleThread.__init__(self, log) self.shutdown = False self.tcp_client = tcp_client def run(self): while not self.shutdown: # Decode data from socket try: self.tcp_client.send_to(*self.tcp_client.handle(self.tcp_client, self.tcp_client.sock, self.tcp_client.receive_from())) except NoDataException: # No data means that there where nothign to read for and so the socket is dead self.shutdown = True except socket.error as e: if e.errno == errno.EINTR: continue break except NoResponseRequired: # Method does not require a response to the socket, this is actually fine ^^ pass except KeyboardInterrupt: break except: traceback.print_exc() # Not really good to just pass but saver for now! pass try: # Close socket just in case we let the resource open if self.tcp_client.sock: self.tcp_client.sock.close() except: # Not really an issue at this point pass self.log.info("client_thread exiting") def stop(self): try: if self.tcp_client.sock: self.tcp_client.sock.close() except: pass self.shutdown = True # Not the best way but the safest... we can not wait for the server on termination. self.stop_and_wait() self.log.info("client_thread stopped")
[docs]class TCPClient(TCPSocket, TCPHandler): """ TCP client used to map protocol calling mechanisms to a given function. Just a special socket that does apart from sending and receiving the translation of our protocol. """ def __init__(self, log, host, port, node, socket1=None, socket_timeout=None): TCPSocket.__init__(self, host, port, node, socket1, socket_timeout) TCPHandler.__init__(self) self.master_thread = None self.log = log
[docs] def connect(self): """Connect and start the client thread to listen for responses""" TCPSocket.connect(self) self.master_thread = client_thread(self, self.log) self.master_thread.start()
[docs] def close(self): """Close socket connection and client thread""" try: TCPSocket.close(self) finally: # Alwasy stop the client thread! self.master_thread.stop() #try this: http://zguide.zeromq.org/py:mtserver #and this to optimize our IPC calls! http://taotetek.net/2011/02/03/python-multiprocessing-zeromq-vs-queue/
class TCPServerZMQ(threading.Thread, TCPHandler): """ TCP ZeroMQ async server. Spawns a number of workers that will respond to client requests """ def __init__(self, port, log, num_workers=5): threading.Thread.__init__ (self) self.log = log # Some thread related stuff self.daemon = True self.kill_switch = False # The frontend is where we get the request from outside # we will route them to our workers to get processed self.port = port self.num_workers = num_workers self.context = zmq.Context() self.frontend = self.context.socket(zmq.ROUTER) self.frontend.bind('tcp://*:{port}'.format(port=self.port)) self.frontend.setsockopt(zmq.LINGER, 0) self.frontend.set_hwm(0) # The backend is where we queue the requests that the workers # will start working on in round robbin fashion self.backend = self.context.socket(zmq.DEALER) self.backend.bind('inproc://backend') self.backend.setsockopt(zmq.LINGER, 0) self.backend.set_hwm(0) self.backend_client = self.context.socket(zmq.DEALER) self.backend_client.bind('inproc://backend-client') self.backend_client.setsockopt(zmq.LINGER, 0) self.backend_client.set_hwm(0) # The poller is used to poll for incomming messages for both # the frontend (internet) and the backend (scheduling) self.poll = zmq.Poller() self.poll.register(self.frontend, zmq.POLLIN) self.poll.register(self.backend, zmq.POLLIN) self.poll.register(self.backend_client, zmq.POLLIN) # Create workers self.workers = [TCPServerZMQWorker(self.context, self.log) for i in range(self.num_workers)] def stop(self): """ Stop server and workers """ self.log.info("Shutting down TCPServerZMQ") for worker in self.workers: worker.stop() self.kill_switch = True self.join(5000) self.log.info("TCPServerZMQ shutdown finished") def add_method(self, name, method): # We will just pass the handle to our workers for worker in self.workers: worker.add_method(name, method) def run(self): self.log.info("TCPServerZMQ started") # Create and launch workers for worker in self.workers: worker.start() # Start receiving messages while not self.kill_switch: try: sockets = dict(self.poll.poll(1000)) if self.frontend in sockets: ident, msg = self.frontend.recv_multipart(flags=zmq.NOBLOCK) #tprint('Server received message from %s' % (ident)) self.backend.send_multipart([ident, msg], flags=zmq.NOBLOCK) if self.backend in sockets: ident, msg = self.backend.recv_multipart(flags=zmq.NOBLOCK) #tprint('Sending message back to %s' % (ident)) self.frontend.send_multipart([ident, msg], flags=zmq.NOBLOCK) if self.backend_client in sockets: ident, msg = self.backend_client.recv_multipart() #tprint('Sending message back to %s' % (ident)) self.frontend.send_multipart([ident, msg], flags=zmq.NOBLOCK) except zmq.Again: # Timeouy just fired, no problem! pass except NoResponseRequired: # Method does not require a response to the socket, this is actually fine ^^ pass except NoDataException: # No data means that there where nothign to read for and so the socket is dead break except socket.error as e: if e.errno == errno.EINTR: continue break except KeyboardInterrupt: break except zmq.ContextTerminated: break except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: pass # no message was ready else: break except: traceback.print_exc() # Not really good to just pass but saver for now! pass self.frontend.close() self.backend.close() self.backend_client.close() self.context.term() self.log.info("TCPServerZMQ stopped") class TCPServerZMQWorker(threading.Thread, TCPHandler): """ServerWorker""" def __init__(self, context, log): threading.Thread.__init__ (self) TCPHandler.__init__(self) self.log = log # Worker stuff self.context = context self.worker = self.context.socket(zmq.DEALER) self.worker.RCVTIMEO = 1000 self.worker.setsockopt(zmq.LINGER, 0) self.worker.set_hwm(0) # Some thread related stuff self.daemon = True self.kill_switch = False def run(self): self.worker.connect('inproc://backend') self.log.info("TCPServerZMQWorker started") while not self.kill_switch: try: # Receive message and unpickle it ident, msg = self.worker.recv_multipart(flags=zmq.NOBLOCK) msg = unpickle_message(msg) #tprint('Worker received %s from %s' % (msg, ident)) # Handle message result = self.handle(self, ident, msg) # Send back to router send_to_zmq_multi(self.worker, ident, *result) except zmq.Again: # Timeouy just fired, no problem! pass except NoResponseRequired: # Method does not require a response to the socket, this is actually fine ^^ pass except NoDataException: # No data means that there where nothign to read for and so the socket is dead break except socket.error as e: if e.errno == errno.EINTR: continue break except KeyboardInterrupt: break except zmq.ContextTerminated: break except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: pass # no message was ready else: break except: traceback.print_exc() # Not really good to just pass but saver for now! pass self.worker.close() self.log.info("TCPServerZMQWorker stopped") def stop(self): self.log.info("Shutting down TCPServerZMQWorker") self.kill_switch = True self.join(1000) class TCPServerProxyZMQ(TCPSocketZMQ, threading.Thread, TCPHandler): """ TCP client using the ZeroMQ network framework """ def __init__(self, identity, host, port, log): TCPSocketZMQ.__init__(self, identity, host, port) TCPHandler.__init__(self) threading.Thread.__init__ (self) self.log = log # The backedn is where we queue the requests that the workers # will start working on in round robbin fashion self.backend = self.context.socket(zmq.DEALER) self.backend.bind('inproc://backend') # Before starting create socket poll self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) self.poll.register(self.backend, zmq.POLLIN) # Some thread related stuff self.daemon = True self.kill_switch = False # Our lock used to protect the backend socket self.lock = threading.Lock() def send_to(self, method, *args, **kwargs): """Send data to a socket""" with self.lock: send_to_zmq(self.backend, method, *args, **kwargs) #self.backend.send_multipart([self.identity, method]) #print("sending to backend end") def connect(self): """Connect and start the client thread to listen for responses""" self.start() def close(self): """Close socket connection and client thread""" try: self.context.term() finally: # Alwasy stop the client thread! self.stop() def run(self): TCPSocketZMQ.connect(self) self.backend.identity = self.identity.encode('ascii') self.backend.connect('inproc://backend') self.log.info("TCPServerProxyZMQ started") while not self.kill_switch: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: msg = unpickle_message(self.socket.recv(flags=zmq.NOBLOCK)) #tprint('From server') result = self.handle(self, self.identity, msg) send_to_zmq(self.socket, *result) if self.backend in sockets: msg = self.backend.recv(flags=zmq.NOBLOCK) #tprint('To Server') self.socket.send(msg, flags=zmq.NOBLOCK) except zmq.Again: # Timeouy just fired, no problem! pass except NoResponseRequired: # Method does not require a response to the socket, this is actually fine ^^ pass except NoDataException: # No data means that there where nothign to read for and so the socket is dead break except socket.error as e: if e.errno == errno.EINTR: continue break except KeyboardInterrupt: break except zmq.ContextTerminated: break except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: pass # no message was ready else: break except: traceback.print_exc() # Not really good to just pass but saver for now! pass # Close socket self.socket.close() self.backend.close() self.log.info("TCPServerProxyZMQ stopped") def stop(self): """ Stop socket and thread """ self.log.info("Shutting down TCPServerProxyZMQ") self.kill_switch = True self.join(1000) class TCPClientProxyZMQ(): """ TCP client using the ZeroMQ network framework """ def __init__(self, context, identity, log): self.log = log self.context = context self.identity = identity # The backend which our server will process (or some other workers might) self.backend = self.context.socket(zmq.DEALER) self.backend.identity = self.identity.encode('ascii') self.backend.connect('inproc://backend-client') # Our lock used to protect the backend socket self.lock = threading.Lock() def send_to(self, method, *args, **kwargs): """Send data to a socket""" with self.lock: send_to_zmq_multi(self.backend, self.identity, method, *args, **kwargs) #print("sending to backend ends")
[docs]def tcpremote(tcp_opbject, name=None): """ makes TCPServer or TCPClient a decorator so that you can write : from tcpserver import * server = TCPServer(...) @tcpremote(server, name='login') def login(request, client_address, user_name, user_pass): (...) """ def remotify(func): if isinstance(tcp_opbject, TCPHandler): func_name = name if func_name is None: func_name = func.__name__ tcp_opbject.add_method(func_name, func) else: raise NotImplementedError('Server "%s" not an instance of TCPServer' % str(tcp_opbject)) return func return remotify