Source code for concurrent.core.transport.pyjsonrpc.rpcrequest

#!/usr/bin/env python
# coding: utf-8

import sys
import traceback
import uuid
import rpcerror
from bunch import Bunch
from rpcjson import json, JsonParseError


[docs]class Request(Bunch): """ JSON-RPC-Request """ def __init__( self, jsonrpc = None, method = None, id = None, params = None ): Bunch.__init__(self) self.jsonrpc = jsonrpc self.method = method self.id = id self.params = params
[docs] def get_splitted_params(self): """ Split positional and named params :returns: positional_params, named_params """ positional_params = [] named_params = {} params = self.get("params", []) if isinstance(params, list): positional_params = params elif isinstance(params, dict): positional_params = params.get("__args", []) if positional_params: del params["__args"] named_params = params return positional_params, named_params
[docs]def parse_request_json(json_string): """ Returns RPC-request as dictionary or as list with requests """ # No JSON-String if json_string is None: raise rpcerror.InvalidRequest() # Parse try: data = json.loads(json_string) except JsonParseError: traceback_info = "".join(traceback.format_exception(*sys.exc_info())) raise rpcerror.ParseError(data = traceback_info) # Create request(s) if isinstance(data, list): requests = [] for item in data: requests.append(Request( jsonrpc = item.get("jsonrpc"), method = str(item.get("method", "")), id = item.get("id"), params = item.get("params", []) )) return requests else: return Request( jsonrpc = data.get("jsonrpc"), method = str(data.get("method", "")), id = data.get("id"), params = data.get("params", []) )
[docs]def create_request_dict(method, *args, **kwargs): """ Returns a JSON-RPC-Dictionary for a method :param method: Name of the method :param args: Positional parameters :param kwargs: Named parameters """ if kwargs: params = kwargs if args: params["__args"] = args else: params = args data = { "method": unicode(method), "id": unicode(uuid.uuid4()), "jsonrpc": u"2.0", "params": params } return data
[docs]def create_request_json(method, *args, **kwargs): """ Returns a JSON-RPC-String for a method :param method: Name of the method :param args: Positional parameters :param kwargs: Named parameters """ return json.dumps(create_request_dict(method, *args, **kwargs))