Source code for concurrent.core.util.utils

# -*- coding: utf-8 -*-
"""
Some usefull base methods and helpers
"""
import errno
import locale
import os
import re
import sys
import time
import tempfile


try:
    # Python 2
    from urllib import quote, unquote, urlencode
except ImportError:
    # Python 3
    from urllib.parse import quote, unquote, urlencode

try:
    # Python 2
    from itertools import izip
except ImportError:
    # Python 3
    izip = zip

# our imports
from concurrent.core.util.texttransforms import *

[docs]def is_number(s): try: float(s) return True except ValueError: return False
[docs]def is_digit(s): try: int(s) return True except ValueError: return False
[docs]def tprint(msg): """ Print function used to not confuse newlines when printing from different therads """ sys.stdout.write(msg + '\n') sys.stdout.flush() # -- algorithmic utilities
DIGITS = re.compile(r'(\d+)')
[docs]def embedded_numbers(s): """Comparison function for natural order sorting based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/214202.""" pieces = DIGITS.split(s) pieces[1::2] = map(int, pieces[1::2]) return pieces # -- os utilities
[docs]class NaivePopen: """This is a deadlock-safe version of popen that returns an object with errorlevel, out (a string) and err (a string). The optional `input`, which must be a `str` object, is first written to a temporary file from which the process will read. (`capturestderr` may not work under Windows 9x.) Example: print Popen3('grep spam','\n\nhere spam\n\n').out """ def __init__(self, command, input=None, capturestderr=None): outfile = tempfile.mktemp() command = '( %s ) > %s' % (command, outfile) if input: infile = tempfile.mktemp() tmp = open(infile, 'w') tmp.write(input) tmp.close() command = command + ' <' + infile if capturestderr: errfile = tempfile.mktemp() command = command + ' 2>' + errfile try: self.err = None self.errorlevel = os.system(command) >> 8 outfd = file(outfile, 'r') self.out = outfd.read() outfd.close() if capturestderr: errfd = file(errfile,'r') self.err = errfd.read() errfd.close() finally: if os.path.isfile(outfile): os.remove(outfile) if input and os.path.isfile(infile): os.remove(infile) if capturestderr and os.path.isfile(errfile): os.remove(errfile) # -- sys utils
[docs]def arity(f): return f.func_code.co_argcount
[docs]def get_last_traceback(): import traceback from StringIO import StringIO tb = StringIO() traceback.print_exc(file=tb) return to_unicode(tb.getvalue())
[docs]def get_lines_from_file(filename, lineno, context=0): """Return `content` number of lines before and after the specified `lineno` from the file identified by `filename`. Returns a `(lines_before, line, lines_after)` tuple. """ if os.path.isfile(filename): fileobj = open(filename, 'U') try: lines = fileobj.readlines() lbound = max(0, lineno - context) ubound = lineno + 1 + context charset = None rep = re.compile('coding[=:]\s*([-\w.]+)') for linestr in lines[0], lines[1]: match = rep.search(linestr) if match: charset = match.group(1) break before = [to_unicode(l.rstrip('\n'), charset) for l in lines[lbound:lineno]] line = to_unicode(lines[lineno].rstrip('\n'), charset) after = [to_unicode(l.rstrip('\n'), charset) \ for l in lines[lineno + 1:ubound]] return before, line, after finally: fileobj.close() return (), None, ()
[docs]def safe__import__(module_name): """ Safe imports: rollback after a failed import. Initially inspired from the RollbackImporter in PyUnit, but it's now much simpler and works better for our needs. See http://pyunit.sourceforge.net/notes/reloading.html """ already_imported = sys.modules.copy() try: return __import__(module_name, globals(), locals(), []) except Exception as e: for modname in sys.modules.copy(): if not already_imported.has_key(modname): del(sys.modules[modname]) raise e # -- setuptools utils
[docs]def get_module_path(module): # Determine the plugin that this component belongs to path = module.__file__ module_name = module.__name__ if path.endswith('.pyc') or path.endswith('.pyo'): path = path[:-1] if os.path.basename(path) == '__init__.py': path = os.path.dirname(path) base_path = os.path.splitext(path)[0] while base_path.replace(os.sep, '.').endswith(module_name): base_path = os.path.dirname(base_path) module_name = '.'.join(module_name.split('.')[:-1]) if not module_name: break return base_path
[docs]def get_pkginfo(dist): """Get a dictionary containing package information for a package `dist` can be either a Distribution instance or, as a shortcut, directly the module instance, if one can safely infer a Distribution instance from it. Always returns a dictionary but it will be empty if no Distribution instance can be created for the given module. """ import types if isinstance(dist, types.ModuleType): try: from pkg_resources import find_distributions module = dist module_path = get_module_path(module) for dist in find_distributions(module_path, only=True): if os.path.isfile(module_path) or \ dist.key == module.__name__.lower(): break else: return {} except ImportError: return {} import email attrs = ('author', 'author-email', 'license', 'home-page', 'summary', 'description', 'version') info = {} def normalize(attr): return attr.lower().replace('-', '_') try: pkginfo = email.message_from_string(dist.get_metadata('PKG-INFO')) for attr in [key for key in attrs if key in pkginfo]: info[normalize(attr)] = pkginfo[attr] except IOError as e: err = 'Failed to read PKG-INFO file for %s: %s' % (dist, e) for attr in attrs: info[normalize(attr)] = err except email.Errors.MessageError as e: err = 'Failed to parse PKG-INFO file for %s: %s' % (dist, e) for attr in attrs: info[normalize(attr)] = err return info # -- misc. utils
[docs]class Ranges(object): """ Holds information about ranges parsed from a string >>> x = Ranges("1,2,9-15") >>> 1 in x True >>> 5 in x False >>> 10 in x True >>> 16 in x False >>> [i for i in range(20) if i in x] [1, 2, 9, 10, 11, 12, 13, 14, 15] Also supports iteration, which makes that last example a bit simpler: >>> list(x) [1, 2, 9, 10, 11, 12, 13, 14, 15] Note that it automatically reduces the list and short-circuits when the desired ranges are a relatively small portion of the entire set: >>> x = Ranges("99") >>> 1 in x # really fast False >>> x = Ranges("1, 2, 1-2, 2") # reduces this to 1-2 >>> x.pairs [(1, 2)] >>> x = Ranges("1-9,2-4") # handle ranges that completely overlap >>> list(x) [1, 2, 3, 4, 5, 6, 7, 8, 9] The members 'a' and 'b' refer to the min and max value of the range, and are None if the range is empty: >>> x.a 1 >>> x.b 9 >>> e = Ranges() >>> e.a, e.b (None, None) Empty ranges are ok, and ranges can be constructed in pieces, if you so choose: >>> x = Ranges() >>> x.appendrange("1, 2, 3") >>> x.appendrange("5-9") >>> x.appendrange("2-3") # reduce'd away >>> list(x) [1, 2, 3, 5, 6, 7, 8, 9] ''Code contributed by Tim Hatch'' """ RE_STR = r"""\d+(?:[-:]\d+)?(?:,\d+(?:[-:]\d+)?)*""" def __init__(self, r=None): self.pairs = [] self.a = self.b = None self.appendrange(r)
[docs] def appendrange(self, r): """Add a range (from a string or None) to the current one""" if not r: return p = self.pairs for x in r.split(","): try: a, b = map(int, x.split('-', 1)) except ValueError: a, b = int(x), int(x) if b >= a: p.append((a, b)) self._reduce()
def _reduce(self): """Come up with the minimal representation of the ranges""" p = self.pairs p.sort() i = 0 while i + 1 < len(p): if p[i+1][0]-1 <= p[i][1]: # this item overlaps with the next # make the first include the second p[i] = (p[i][0], max(p[i][1], p[i+1][1])) del p[i+1] # delete the second, after adjusting my endpoint else: i += 1 if p: self.a = p[0][0] # min value self.b = p[-1][1] # max value else: self.a = self.b = None def __iter__(self): """ This is another way I came up with to do it. Is it faster? from itertools import chain return chain(*[xrange(a, b+1) for a, b in self.pairs]) """ for a, b in self.pairs: for i in range(a, b+1): yield i def __contains__(self, x): """ >>> 55 in Ranges() False """ # short-circuit if outside the possible range if self.a is not None and self.a <= x <= self.b: for a, b in self.pairs: if a <= x <= b: return True if b > x: # short-circuit if we've gone too far break return False def __str__(self): """Provide a compact string representation of the range. >>> (str(Ranges("1,2,3,5")), str(Ranges()), str(Ranges('2'))) ('1-3,5', '', '2') >>> str(Ranges('99-1')) # only nondecreasing ranges allowed '' """ r = [] for a, b in self.pairs: if a == b: r.append(str(a)) else: r.append("%d-%d" % (a, b)) return ",".join(r) def __len__(self): """The length of the entire span, ignoring holes. >>> (len(Ranges('99')), len(Ranges('1-2')), len(Ranges(''))) (1, 2, 0) """ if self.a is not None and self.b is not None: return self.b - self.a + 1 else: return 0
[docs]def to_ranges(revs): """Converts a list of revisions to a minimal set of ranges. >>> to_ranges([2, 12, 3, 6, 9, 1, 5, 11]) '1-3,5-6,9,11-12' >>> to_ranges([]) '' """ ranges = [] begin = end = None def store(): if end == begin: ranges.append(str(begin)) else: ranges.append('%d-%d' % (begin, end)) for rev in sorted(revs): if begin is None: begin = end = rev elif rev == end + 1: end = rev else: store() begin = end = rev if begin is not None: store() return ','.join(ranges)
[docs]def content_disposition(type, filename=None): """Generate a properly escaped Content-Disposition header""" if filename is not None: if isinstance(filename, unicode): filename = filename.encode('utf-8') type += '; filename=' + quote(filename, safe='') return type
[docs]def partition(iterable, order=None): result = {} if order is not None: for key in order: result[key] = [] for item, category in iterable: result.setdefault(category, []).append(item) if order is None: return result return [result[key] for key in order]