Viewing file: util.py (29.03 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.python.test.test_util -*- # Copyright (c) 2001-2010 Twisted Matrix Laboratories. # See LICENSE for details.
import os, sys, hmac, errno, new, inspect, warnings try: import pwd, grp except ImportError: pwd = grp = None try: from os import setgroups, getgroups except ImportError: setgroups = getgroups = None from UserDict import UserDict
class InsensitiveDict: """Dictionary, that has case-insensitive keys.
Normally keys are retained in their original form when queried with .keys() or .items(). If initialized with preserveCase=0, keys are both looked up in lowercase and returned in lowercase by .keys() and .items(). """ """ Modified recipe at http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66315 originally contributed by Sami Hangaslammi. """
def __init__(self, dict=None, preserve=1): """Create an empty dictionary, or update from 'dict'.""" self.data = {} self.preserve=preserve if dict: self.update(dict)
def __delitem__(self, key): k=self._lowerOrReturn(key) del self.data[k]
def _lowerOrReturn(self, key): if isinstance(key, str) or isinstance(key, unicode): return key.lower() else: return key
def __getitem__(self, key): """Retrieve the value associated with 'key' (in any case).""" k = self._lowerOrReturn(key) return self.data[k][1]
def __setitem__(self, key, value): """Associate 'value' with 'key'. If 'key' already exists, but in different case, it will be replaced.""" k = self._lowerOrReturn(key) self.data[k] = (key, value)
def has_key(self, key): """Case insensitive test whether 'key' exists.""" k = self._lowerOrReturn(key) return self.data.has_key(k) __contains__=has_key
def _doPreserve(self, key): if not self.preserve and (isinstance(key, str) or isinstance(key, unicode)): return key.lower() else: return key
def keys(self): """List of keys in their original case.""" return list(self.iterkeys())
def values(self): """List of values.""" return list(self.itervalues())
def items(self): """List of (key,value) pairs.""" return list(self.iteritems())
def get(self, key, default=None): """Retrieve value associated with 'key' or return default value if 'key' doesn't exist.""" try: return self[key] except KeyError: return default
def setdefault(self, key, default): """If 'key' doesn't exists, associate it with the 'default' value. Return value associated with 'key'.""" if not self.has_key(key): self[key] = default return self[key]
def update(self, dict): """Copy (key,value) pairs from 'dict'.""" for k,v in dict.items(): self[k] = v
def __repr__(self): """String representation of the dictionary.""" items = ", ".join([("%r: %r" % (k,v)) for k,v in self.items()]) return "InsensitiveDict({%s})" % items
def iterkeys(self): for v in self.data.itervalues(): yield self._doPreserve(v[0])
def itervalues(self): for v in self.data.itervalues(): yield v[1]
def iteritems(self): for (k, v) in self.data.itervalues(): yield self._doPreserve(k), v
def popitem(self): i=self.items()[0] del self[i[0]] return i
def clear(self): for k in self.keys(): del self[k]
def copy(self): return InsensitiveDict(self, self.preserve)
def __len__(self): return len(self.data)
def __eq__(self, other): for k,v in self.items(): if not (k in other) or not (other[k]==v): return 0 return len(self)==len(other)
class OrderedDict(UserDict): """A UserDict that preserves insert order whenever possible.""" def __init__(self, dict=None, **kwargs): self._order = [] self.data = {} if dict is not None: if hasattr(dict,'keys'): self.update(dict) else: for k,v in dict: # sequence self[k] = v if len(kwargs): self.update(kwargs) def __repr__(self): return '{'+', '.join([('%r: %r' % item) for item in self.items()])+'}'
def __setitem__(self, key, value): if not self.has_key(key): self._order.append(key) UserDict.__setitem__(self, key, value)
def copy(self): return self.__class__(self)
def __delitem__(self, key): UserDict.__delitem__(self, key) self._order.remove(key)
def iteritems(self): for item in self._order: yield (item, self[item])
def items(self): return list(self.iteritems())
def itervalues(self): for item in self._order: yield self[item]
def values(self): return list(self.itervalues())
def iterkeys(self): return iter(self._order)
def keys(self): return list(self._order)
def popitem(self): key = self._order[-1] value = self[key] del self[key] return (key, value)
def setdefault(self, item, default): if self.has_key(item): return self[item] self[item] = default return default
def update(self, d): for k, v in d.items(): self[k] = v
def uniquify(lst): """Make the elements of a list unique by inserting them into a dictionary. This must not change the order of the input lst. """ dct = {} result = [] for k in lst: if not dct.has_key(k): result.append(k) dct[k] = 1 return result
def padTo(n, seq, default=None): """Pads a sequence out to n elements,
filling in with a default value if it is not long enough.
If the input sequence is longer than n, raises ValueError.
Details, details: This returns a new list; it does not extend the original sequence. The new list contains the values of the original sequence, not copies. """
if len(seq) > n: raise ValueError, "%d elements is more than %d." % (len(seq), n)
blank = [default] * n
blank[:len(seq)] = list(seq)
return blank
def getPluginDirs(): import twisted systemPlugins = os.path.join(os.path.dirname(os.path.dirname( os.path.abspath(twisted.__file__))), 'plugins') userPlugins = os.path.expanduser("~/TwistedPlugins") confPlugins = os.path.expanduser("~/.twisted") allPlugins = filter(os.path.isdir, [systemPlugins, userPlugins, confPlugins]) return allPlugins
def addPluginDir(): sys.path.extend(getPluginDirs())
def sibpath(path, sibling): """Return the path to a sibling of a file in the filesystem.
This is useful in conjunction with the special __file__ attribute that Python provides for modules, so modules can load associated resource files. """ return os.path.join(os.path.dirname(os.path.abspath(path)), sibling)
def _getpass(prompt): """Helper to turn IOErrors into KeyboardInterrupts""" import getpass try: return getpass.getpass(prompt) except IOError, e: if e.errno == errno.EINTR: raise KeyboardInterrupt raise except EOFError: raise KeyboardInterrupt
def getPassword(prompt = 'Password: ', confirm = 0, forceTTY = 0, confirmPrompt = 'Confirm password: ', mismatchMessage = "Passwords don't match."): """Obtain a password by prompting or from stdin.
If stdin is a terminal, prompt for a new password, and confirm (if C{confirm} is true) by asking again to make sure the user typed the same thing, as keystrokes will not be echoed.
If stdin is not a terminal, and C{forceTTY} is not true, read in a line and use it as the password, less the trailing newline, if any. If C{forceTTY} is true, attempt to open a tty and prompt for the password using it. Raise a RuntimeError if this is not possible.
@returns: C{str} """ isaTTY = hasattr(sys.stdin, 'isatty') and sys.stdin.isatty()
old = None try: if not isaTTY: if forceTTY: try: old = sys.stdin, sys.stdout sys.stdin = sys.stdout = open('/dev/tty', 'r+') except: raise RuntimeError("Cannot obtain a TTY") else: password = sys.stdin.readline() if password[-1] == '\n': password = password[:-1] return password
while 1: try1 = _getpass(prompt) if not confirm: return try1 try2 = _getpass(confirmPrompt) if try1 == try2: return try1 else: sys.stderr.write(mismatchMessage + "\n") finally: if old: sys.stdin.close() sys.stdin, sys.stdout = old
def dict(*a, **k): import __builtin__ warnings.warn('twisted.python.util.dict is deprecated. Use __builtin__.dict instead') return __builtin__.dict(*a, **k)
def println(*a): sys.stdout.write(' '.join(map(str, a))+'\n')
# XXX # This does not belong here # But where does it belong?
def str_xor(s, b): return ''.join([chr(ord(c) ^ b) for c in s])
def keyed_md5(secret, challenge): """ Create the keyed MD5 string for the given secret and challenge. """ warnings.warn( "keyed_md5() is deprecated. Use the stdlib module hmac instead.", DeprecationWarning, stacklevel=2 ) return hmac.HMAC(secret, challenge).hexdigest()
def makeStatBar(width, maxPosition, doneChar = '=', undoneChar = '-', currentChar = '>'): """Creates a function that will return a string representing a progress bar. """ aValue = width / float(maxPosition) def statBar(position, force = 0, last = ['']): assert len(last) == 1, "Don't mess with the last parameter." done = int(aValue * position) toDo = width - done - 2 result = "[%s%s%s]" % (doneChar * done, currentChar, undoneChar * toDo) if force: last[0] = result return result if result == last[0]: return '' last[0] = result return result
statBar.__doc__ = """statBar(position, force = 0) -> '[%s%s%s]'-style progress bar
returned string is %d characters long, and the range goes from 0..%d. The 'position' argument is where the '%s' will be drawn. If force is false, '' will be returned instead if the resulting progress bar is identical to the previously returned progress bar. """ % (doneChar * 3, currentChar, undoneChar * 3, width, maxPosition, currentChar) return statBar
def spewer(frame, s, ignored): """A trace function for sys.settrace that prints every function or method call.""" from twisted.python import reflect if frame.f_locals.has_key('self'): se = frame.f_locals['self'] if hasattr(se, '__class__'): k = reflect.qual(se.__class__) else: k = reflect.qual(type(se)) print 'method %s of %s at %s' % ( frame.f_code.co_name, k, id(se) ) else: print 'function %s in %s, line %s' % ( frame.f_code.co_name, frame.f_code.co_filename, frame.f_lineno)
def searchupwards(start, files=[], dirs=[]): """Walk upwards from start, looking for a directory containing all files and directories given as arguments:: >>> searchupwards('.', ['foo.txt'], ['bar', 'bam'])
If not found, return None """ start=os.path.abspath(start) parents=start.split(os.sep) exists=os.path.exists; join=os.sep.join; isdir=os.path.isdir while len(parents): candidate=join(parents)+os.sep allpresent=1 for f in files: if not exists("%s%s" % (candidate, f)): allpresent=0 break if allpresent: for d in dirs: if not isdir("%s%s" % (candidate, d)): allpresent=0 break if allpresent: return candidate parents.pop(-1) return None
class LineLog: """ A limited-size line-based log, useful for logging line-based protocols such as SMTP.
When the log fills up, old entries drop off the end. """ def __init__(self, size=10): """ Create a new log, with size lines of storage (default 10). A log size of 0 (or less) means an infinite log. """ if size < 0: size = 0 self.log = [None]*size self.size = size
def append(self,line): if self.size: self.log[:-1] = self.log[1:] self.log[-1] = line else: self.log.append(line)
def str(self): return '\n'.join(filter(None,self.log))
def __getitem__(self, item): return filter(None,self.log)[item]
def clear(self): """Empty the log""" self.log = [None]*self.size
def raises(exception, f, *args, **kwargs): """Determine whether the given call raises the given exception""" try: f(*args, **kwargs) except exception: return 1 return 0
class IntervalDifferential: """ Given a list of intervals, generate the amount of time to sleep between \"instants\".
For example, given 7, 11 and 13, the three (infinite) sequences::
7 14 21 28 35 ... 11 22 33 44 ... 13 26 39 52 ...
will be generated, merged, and used to produce::
(7, 0) (4, 1) (2, 2) (1, 0) (7, 0) (1, 1) (4, 2) (2, 0) (5, 1) (2, 0)
New intervals may be added or removed as iteration proceeds using the proper methods. """
def __init__(self, intervals, default=60): """ @type intervals: C{list} of C{int}, C{long}, or C{float} param @param intervals: The intervals between instants.
@type default: C{int}, C{long}, or C{float} @param default: The duration to generate if the intervals list becomes empty. """ self.intervals = intervals[:] self.default = default
def __iter__(self): return _IntervalDifferentialIterator(self.intervals, self.default)
class _IntervalDifferentialIterator: def __init__(self, i, d):
self.intervals = [[e, e, n] for (e, n) in zip(i, range(len(i)))] self.default = d self.last = 0
def next(self): if not self.intervals: return (self.default, None) last, index = self.intervals[0][0], self.intervals[0][2] self.intervals[0][0] += self.intervals[0][1] self.intervals.sort() result = last - self.last self.last = last return result, index
def addInterval(self, i): if self.intervals: delay = self.intervals[0][0] - self.intervals[0][1] self.intervals.append([delay + i, i, len(self.intervals)]) self.intervals.sort() else: self.intervals.append([i, i, 0])
def removeInterval(self, interval): for i in range(len(self.intervals)): if self.intervals[i][1] == interval: index = self.intervals[i][2] del self.intervals[i] for i in self.intervals: if i[2] > index: i[2] -= 1 return raise ValueError, "Specified interval not in IntervalDifferential"
class FancyStrMixin: """ Set showAttributes to a sequence of strings naming attributes, OR sequences of (attributeName, displayName, formatCharacter) """ showAttributes = () def __str__(self): r = ['<', hasattr(self, 'fancybasename') and self.fancybasename or self.__class__.__name__] for attr in self.showAttributes: if isinstance(attr, str): r.append(' %s=%r' % (attr, getattr(self, attr))) else: r.append((' %s=' + attr[2]) % (attr[1], getattr(self, attr[0]))) r.append('>') return ''.join(r) __repr__ = __str__
class FancyEqMixin: compareAttributes = () def __eq__(self, other): if not self.compareAttributes: return self is other if isinstance(self, other.__class__): return ( [getattr(self, name) for name in self.compareAttributes] == [getattr(other, name) for name in self.compareAttributes]) return NotImplemented
def __ne__(self, other): result = self.__eq__(other) if result is NotImplemented: return result return not result
def dsu(list, key): """ decorate-sort-undecorate (aka "Schwartzian transform")
DEPRECATED. Use the built-in C{sorted()} instead. """ warnings.warn(("dsu is deprecated since Twisted 10.1. " "Use the built-in sorted() instead."), DeprecationWarning, stacklevel=2) L2 = [(key(e), i, e) for (i, e) in zip(range(len(list)), list)] L2.sort() return [e[2] for e in L2]
try: from twisted.python._initgroups import initgroups as _c_initgroups except ImportError: _c_initgroups = None
if pwd is None or grp is None or setgroups is None or getgroups is None: def initgroups(uid, primaryGid): """ Do nothing.
Underlying platform support require to manipulate groups is missing. """ else: # Fallback to the inefficient Python version def _setgroups_until_success(l): while(1): # NASTY NASTY HACK (but glibc does it so it must be okay): # In case sysconfig didn't give the right answer, find the limit # on max groups by just looping, trying to set fewer and fewer # groups each time until it succeeds. try: setgroups(l) except ValueError: # This exception comes from python itself restricting # number of groups allowed. if len(l) > 1: del l[-1] else: raise except OSError, e: if e.errno == errno.EINVAL and len(l) > 1: # This comes from the OS saying too many groups del l[-1] else: raise else: # Success, yay! return
def initgroups(uid, primaryGid): """ Initializes the group access list.
If the C extension is present, we're calling it, which in turn calls initgroups(3). If not, this is done by reading the group database /etc/group and using all groups of which C{uid} is a member. The additional group C{primaryGid} is also added to the list.
If the given user is a member of more than C{NGROUPS}, arbitrary groups will be silently discarded to bring the number below that limit.
@type uid: C{int} @param uid: The UID for which to look up group information.
@type primaryGid: C{int} or C{NoneType} @param primaryGid: If provided, an additional GID to include when setting the groups. """ if _c_initgroups is not None: return _c_initgroups(pwd.getpwuid(uid)[0], primaryGid) try: # Try to get the maximum number of groups max_groups = os.sysconf("SC_NGROUPS_MAX") except: # No predefined limit max_groups = 0
username = pwd.getpwuid(uid)[0] l = [] if primaryGid is not None: l.append(primaryGid) for groupname, password, gid, userlist in grp.getgrall(): if username in userlist: l.append(gid) if len(l) == max_groups: break # No more groups, ignore any more try: _setgroups_until_success(l) except OSError, e: # We might be able to remove this code now that we # don't try to setgid/setuid even when not asked to. if e.errno == errno.EPERM: for g in getgroups(): if g not in l: raise else: raise
def switchUID(uid, gid, euid=False): if euid: setuid = os.seteuid setgid = os.setegid else: setuid = os.setuid setgid = os.setgid if gid is not None: setgid(gid) if uid is not None: initgroups(uid, gid) setuid(uid)
class SubclassableCStringIO(object): """A wrapper around cStringIO to allow for subclassing""" __csio = None
def __init__(self, *a, **kw): from cStringIO import StringIO self.__csio = StringIO(*a, **kw)
def __iter__(self): return self.__csio.__iter__()
def next(self): return self.__csio.next()
def close(self): return self.__csio.close()
def isatty(self): return self.__csio.isatty()
def seek(self, pos, mode=0): return self.__csio.seek(pos, mode)
def tell(self): return self.__csio.tell()
def read(self, n=-1): return self.__csio.read(n)
def readline(self, length=None): return self.__csio.readline(length)
def readlines(self, sizehint=0): return self.__csio.readlines(sizehint)
def truncate(self, size=None): return self.__csio.truncate(size)
def write(self, s): return self.__csio.write(s)
def writelines(self, list): return self.__csio.writelines(list)
def flush(self): return self.__csio.flush()
def getvalue(self): return self.__csio.getvalue()
def moduleMovedForSplit(origModuleName, newModuleName, moduleDesc, projectName, projectURL, globDict): """ No-op function; only present for backwards compatibility. There is no reason to call this function. """ warnings.warn( "moduleMovedForSplit is deprecated since Twisted 9.0.", DeprecationWarning, stacklevel=2)
def untilConcludes(f, *a, **kw): while True: try: return f(*a, **kw) except (IOError, OSError), e: if e.args[0] == errno.EINTR: continue raise
_idFunction = id
def setIDFunction(idFunction): """ Change the function used by L{unsignedID} to determine the integer id value of an object. This is largely useful for testing to give L{unsignedID} deterministic, easily-controlled behavior.
@param idFunction: A function with the signature of L{id}. @return: The previous function being used by L{unsignedID}. """ global _idFunction oldIDFunction = _idFunction _idFunction = idFunction return oldIDFunction
# A value about twice as large as any Python int, to which negative values # from id() will be added, moving them into a range which should begin just # above where positive values from id() leave off. _HUGEINT = (sys.maxint + 1L) * 2L def unsignedID(obj): """ Return the id of an object as an unsigned number so that its hex representation makes sense.
This is mostly necessary in Python 2.4 which implements L{id} to sometimes return a negative value. Python 2.3 shares this behavior, but also implements hex and the %x format specifier to represent negative values as though they were positive ones, obscuring the behavior of L{id}. Python 2.5's implementation of L{id} always returns positive values. """ rval = _idFunction(obj) if rval < 0: rval += _HUGEINT return rval
def mergeFunctionMetadata(f, g): """ Overwrite C{g}'s name and docstring with values from C{f}. Update C{g}'s instance dictionary with C{f}'s.
To use this function safely you must use the return value. In Python 2.3, L{mergeFunctionMetadata} will create a new function. In later versions of Python, C{g} will be mutated and returned.
@return: A function that has C{g}'s behavior and metadata merged from C{f}. """ try: g.__name__ = f.__name__ except TypeError: try: merged = new.function( g.func_code, g.func_globals, f.__name__, inspect.getargspec(g)[-1], g.func_closure) except TypeError: pass else: merged = g try: merged.__doc__ = f.__doc__ except (TypeError, AttributeError): pass try: merged.__dict__.update(g.__dict__) merged.__dict__.update(f.__dict__) except (TypeError, AttributeError): pass merged.__module__ = f.__module__ return merged
def nameToLabel(mname): """ Convert a string like a variable name into a slightly more human-friendly string with spaces and capitalized letters.
@type mname: C{str} @param mname: The name to convert to a label. This must be a string which could be used as a Python identifier. Strings which do not take this form will result in unpredictable behavior.
@rtype: C{str} """ labelList = [] word = '' lastWasUpper = False for letter in mname: if letter.isupper() == lastWasUpper: # Continuing a word. word += letter else: # breaking a word OR beginning a word if lastWasUpper: # could be either if len(word) == 1: # keep going word += letter else: # acronym # we're processing the lowercase letter after the acronym-then-capital lastWord = word[:-1] firstLetter = word[-1] labelList.append(lastWord) word = firstLetter + letter else: # definitely breaking: lower to upper labelList.append(word) word = letter lastWasUpper = letter.isupper() if labelList: labelList[0] = labelList[0].capitalize() else: return mname.capitalize() labelList.append(word) return ' '.join(labelList)
def uidFromString(uidString): """ Convert a user identifier, as a string, into an integer UID.
@type uid: C{str} @param uid: A string giving the base-ten representation of a UID or the name of a user which can be converted to a UID via L{pwd.getpwnam}.
@rtype: C{int} @return: The integer UID corresponding to the given string.
@raise ValueError: If the user name is supplied and L{pwd} is not available. """ try: return int(uidString) except ValueError: if pwd is None: raise return pwd.getpwnam(uidString)[2]
def gidFromString(gidString): """ Convert a group identifier, as a string, into an integer GID.
@type uid: C{str} @param uid: A string giving the base-ten representation of a GID or the name of a group which can be converted to a GID via L{grp.getgrnam}.
@rtype: C{int} @return: The integer GID corresponding to the given string.
@raise ValueError: If the group name is supplied and L{grp} is not available. """ try: return int(gidString) except ValueError: if grp is None: raise return grp.getgrnam(gidString)[2]
def runAsEffectiveUser(euid, egid, function, *args, **kwargs): """ Run the given function wrapped with seteuid/setegid calls.
This will try to minimize the number of seteuid/setegid calls, comparing current and wanted permissions
@param euid: effective UID used to call the function. @type euid: C{int}
@type egid: effective GID used to call the function. @param egid: C{int}
@param function: the function run with the specific permission. @type function: any callable
@param *args: arguments passed to C{function} @param **kwargs: keyword arguments passed to C{function} """ uid, gid = os.geteuid(), os.getegid() if uid == euid and gid == egid: return function(*args, **kwargs) else: if uid != 0 and (uid != euid or gid != egid): os.seteuid(0) if gid != egid: os.setegid(egid) if euid != 0 and (euid != uid or gid != egid): os.seteuid(euid) try: return function(*args, **kwargs) finally: if euid != 0 and (uid != euid or gid != egid): os.seteuid(0) if gid != egid: os.setegid(gid) if uid != 0 and (uid != euid or gid != egid): os.seteuid(uid)
__all__ = [ "uniquify", "padTo", "getPluginDirs", "addPluginDir", "sibpath", "getPassword", "dict", "println", "keyed_md5", "makeStatBar", "OrderedDict", "InsensitiveDict", "spewer", "searchupwards", "LineLog", "raises", "IntervalDifferential", "FancyStrMixin", "FancyEqMixin", "dsu", "switchUID", "SubclassableCStringIO", "moduleMovedForSplit", "unsignedID", "mergeFunctionMetadata", "nameToLabel", "uidFromString", "gidFromString", "runAsEffectiveUser", "moduleMovedForSplit", ]
|