Viewing file: util.py (7.04 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) 2004 Divmod. # See LICENSE for details.
import inspect, os.path
class UnexposedMethodError(Exception): """ Raised on any attempt to get a method which has not been exposed. """
class Expose(object): """ Helper for exposing methods for various uses using a simple decorator-style callable.
Instances of this class can be called with one or more functions as positional arguments. The names of these functions will be added to a list on the class object of which they are methods.
@ivar attributeName: The attribute with which exposed methods will be tracked. """ def __init__(self, doc=None): self.doc = doc
def __call__(self, *funcObjs): """ Add one or more functions to the set of exposed functions.
This is a way to declare something about a class definition, similar to L{zope.interface.implements}. Use it like this::
| magic = Expose('perform extra magic') | class Foo(Bar): | def twiddle(self, x, y): | ... | def frob(self, a, b): | ... | magic(twiddle, frob)
Later you can query the object::
| aFoo = Foo() | magic.get(aFoo, 'twiddle')(x=1, y=2)
The call to C{get} will fail if the name it is given has not been exposed using C{magic}.
@param funcObjs: One or more function objects which will be exposed to the client.
@return: The first of C{funcObjs}. """ if not funcObjs: raise TypeError("expose() takes at least 1 argument (0 given)") for fObj in funcObjs: fObj.exposedThrough = getattr(fObj, 'exposedThrough', []) fObj.exposedThrough.append(self) return funcObjs[0]
def exposedMethodNames(self, instance): """ Return an iterator of the names of the methods which are exposed on the given instance. """ for k, callable in inspect.getmembers(instance, inspect.isroutine): if self in getattr(callable, 'exposedThrough', []): yield k
_nodefault = object() def get(self, instance, methodName, default=_nodefault): """ Retrieve an exposed method with the given name from the given instance.
@raise UnexposedMethodError: Raised if C{default} is not specified and there is no exposed method with the given name.
@return: A callable object for the named method assigned to the given instance. """ method = getattr(instance, methodName, None) exposedThrough = getattr(method, 'exposedThrough', []) if self not in getattr(method, 'exposedThrough', []): if default is self._nodefault: raise UnexposedMethodError(self, methodName) return default return method
def escapeToXML(text, isattrib = False): """Borrowed from twisted.xish.domish
Escape text to proper XML form, per section 2.3 in the XML specification.
@type text: L{str} @param text: Text to escape
@type isattrib: L{bool} @param isattrib: Triggers escaping of characters necessary for use as attribute values """ text = text.replace("&", "&") text = text.replace("<", "<") text = text.replace(">", ">") if isattrib: text = text.replace("'", "'") text = text.replace("\"", """) return text
def getPOSTCharset(ctx): """Locate the unicode encoding of the POST'ed form data.
To work reliably you must do the following:
- set the form's enctype attribute to 'multipart/form-data' - set the form's accept-charset attribute, probably to 'utf-8' - add a hidden form field called '_charset_'
For instance::
<form action="foo" method="post" enctype="multipart/form-data" accept-charset="utf-8"> <input type="hidden" name="_charset_" /> ... </form> """
from nevow import inevow
request = inevow.IRequest(ctx) # Try the magic '_charset_' field, Mozilla and IE set this. charset = request.args.get('_charset_',[None])[0] if charset: return charset
# Look in the 'content-type' request header contentType = request.received_headers.get('content-type') if contentType: charset = dict([ s.strip().split('=') for s in contentType.split(';')[1:] ]).get('charset') if charset: return charset
return 'utf-8'
from twisted.python.reflect import qual, namedAny, allYourBase, accumulateBases from twisted.python.util import uniquify
from twisted.internet.defer import Deferred, succeed, maybeDeferred, DeferredList from twisted.python import failure from twisted.python.failure import Failure from twisted.python import log
## The tests rely on these, but they should be removed ASAP def remainingSegmentsFactory(ctx): return tuple(ctx.tag.postpath)
def currentSegmentsFactory(ctx): return tuple(ctx.tag.prepath)
class _RandomClazz(object): pass class _NamedAnyError(Exception): 'Internal error for when importing fails.'
def _namedAnyWithBuiltinTranslation(name): if name == '__builtin__.function': name='types.FunctionType' elif name == '__builtin__.method': return _RandomClazz # Hack elif name == '__builtin__.instancemethod': name='types.MethodType' elif name == '__builtin__.NoneType': name='types.NoneType' elif name == '__builtin__.generator': name='types.GeneratorType' return namedAny(name)
# Import resource_filename from setuptools's pkg_resources module if possible # because it handles resources in .zip files. If it's not provide a version # that assumes the resource is directly available on the filesystem. try: from pkg_resources import resource_filename except ImportError: def resource_filename(modulename, resource_name): modulepath = namedAny(modulename).__file__ return os.path.join(os.path.dirname(os.path.abspath(modulepath)), resource_name)
class CachedFile(object): """ Helper for caching operations on files in the filesystem. """ def __init__(self, path, loader): """ @type path: L{str} @param path: The path to the associated file in the filesystem.
@param loader: A callable that returns the relevant data; invoked when the cache is empty or stale. """
self.path = path self.loader = loader self.invalidate()
def invalidate(self): """ Invalidate the cache, forcing a reload from disk at the next attempted load. """ self._mtime = None
def load(self, *args, **kwargs): """ Load this file. Any positional or keyword arguments will be passed along to the loader callable, after the path itself. """ currentTime = os.path.getmtime(self.path) if self._mtime is None or currentTime != self._mtime: self._cachedObj = self.loader(self.path, *args, **kwargs) self._mtime = currentTime
return self._cachedObj
|