Viewing file: unittest.py (56.42 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.trial.test.test_tests -*- # Copyright (c) 2001-2009 Twisted Matrix Laboratories. # See LICENSE for details.
""" Things likely to be used by writers of unit tests.
Maintainer: Jonathan Lange """
import doctest, inspect import os, warnings, sys, tempfile, gc, types from pprint import pformat try: from dis import findlinestarts as _findlinestarts except ImportError: # Definition copied from Python's Lib/dis.py - findlinestarts was not # available in Python 2.3. This function is copyright Python Software # Foundation, released under the Python license: # http://www.python.org/psf/license/ def _findlinestarts(code): """Find the offsets in a byte code which are start of lines in the source.
Generate pairs (offset, lineno) as described in Python/compile.c.
""" byte_increments = [ord(c) for c in code.co_lnotab[0::2]] line_increments = [ord(c) for c in code.co_lnotab[1::2]]
lastlineno = None lineno = code.co_firstlineno addr = 0 for byte_incr, line_incr in zip(byte_increments, line_increments): if byte_incr: if lineno != lastlineno: yield (addr, lineno) lastlineno = lineno addr += byte_incr lineno += line_incr if lineno != lastlineno: yield (addr, lineno)
from twisted.internet import defer, utils from twisted.python import components, failure, log, monkey from twisted.python.deprecate import getDeprecationWarningString
from twisted.trial import itrial, reporter, util
pyunit = __import__('unittest')
from zope.interface import implements
class SkipTest(Exception): """ Raise this (with a reason) to skip the current test. You may also set method.skip to a reason string to skip it, or set class.skip to skip the entire TestCase. """
class FailTest(AssertionError): """Raised to indicate the current test has failed to pass."""
class Todo(object): """ Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo' are reported differently in Trial L{TestResult}s. If todo'd tests fail, they do not fail the suite and the errors are reported in a separate category. If todo'd tests succeed, Trial L{TestResult}s will report an unexpected success. """
def __init__(self, reason, errors=None): """ @param reason: A string explaining why the test is marked 'todo'
@param errors: An iterable of exception types that the test is expected to raise. If one of these errors is raised by the test, it will be trapped. Raising any other kind of error will fail the test. If C{None} is passed, then all errors will be trapped. """ self.reason = reason self.errors = errors
def __repr__(self): return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
def expected(self, failure): """ @param failure: A L{twisted.python.failure.Failure}.
@return: C{True} if C{failure} is expected, C{False} otherwise. """ if self.errors is None: return True for error in self.errors: if failure.check(error): return True return False
def makeTodo(value): """ Return a L{Todo} object built from C{value}.
If C{value} is a string, return a Todo that expects any exception with C{value} as a reason. If C{value} is a tuple, the second element is used as the reason and the first element as the excepted error(s).
@param value: A string or a tuple of C{(errors, reason)}, where C{errors} is either a single exception class or an iterable of exception classes.
@return: A L{Todo} object. """ if isinstance(value, str): return Todo(reason=value) if isinstance(value, tuple): errors, reason = value try: errors = list(errors) except TypeError: errors = [errors] return Todo(reason=reason, errors=errors)
class _Warning(object): """ A L{_Warning} instance represents one warning emitted through the Python warning system (L{warnings}). This is used to insulate callers of L{_collectWarnings} from changes to the Python warnings system which might otherwise require changes to the warning objects that function passes to the observer object it accepts.
@ivar message: The string which was passed as the message parameter to L{warnings.warn}.
@ivar category: The L{Warning} subclass which was passed as the category parameter to L{warnings.warn}.
@ivar filename: The name of the file containing the definition of the code object which was C{stacklevel} frames above the call to L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} parameter passed to L{warnings.warn}.
@ivar lineno: The source line associated with the active instruction of the code object object which was C{stacklevel} frames above the call to L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} parameter passed to L{warnings.warn}. """ def __init__(self, message, category, filename, lineno): self.message = message self.category = category self.filename = filename self.lineno = lineno
def _collectWarnings(observeWarning, f, *args, **kwargs): """ Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments and collect all warnings which are emitted as a result in a list.
@param observeWarning: A callable which will be invoked with a L{_Warning} instance each time a warning is emitted.
@return: The return value of C{f(*args, **kwargs)}. """ def showWarning(message, category, filename, lineno, file=None, line=None): assert isinstance(message, Warning) observeWarning(_Warning( message.args[0], category, filename, lineno))
# Disable the per-module cache for every module otherwise if the warning # which the caller is expecting us to collect was already emitted it won't # be re-emitted by the call to f which happens below. for v in sys.modules.itervalues(): if v is not None: try: v.__warningregistry__ = None except: # Don't specify a particular exception type to handle in case # some wacky object raises some wacky exception in response to # the setattr attempt. pass
origFilters = warnings.filters[:] origShow = warnings.showwarning warnings.simplefilter('always') try: warnings.showwarning = showWarning result = f(*args, **kwargs) finally: warnings.filters[:] = origFilters warnings.showwarning = origShow return result
class _Assertions(pyunit.TestCase, object): """ Replaces many of the built-in TestCase assertions. In general, these assertions provide better error messages and are easier to use in callbacks. Also provides new assertions such as L{failUnlessFailure}.
Although the tests are defined as 'failIf*' and 'failUnless*', they can also be called as 'assertNot*' and 'assert*'. """
def fail(self, msg=None): """ Absolutely fail the test. Do not pass go, do not collect $200.
@param msg: the message that will be displayed as the reason for the failure """ raise self.failureException(msg)
def failIf(self, condition, msg=None): """ Fail the test if C{condition} evaluates to True.
@param condition: any object that defines __nonzero__ """ if condition: raise self.failureException(msg) return condition assertNot = assertFalse = failUnlessFalse = failIf
def failUnless(self, condition, msg=None): """ Fail the test if C{condition} evaluates to False.
@param condition: any object that defines __nonzero__ """ if not condition: raise self.failureException(msg) return condition assert_ = assertTrue = failUnlessTrue = failUnless
def failUnlessRaises(self, exception, f, *args, **kwargs): """ Fail the test unless calling the function C{f} with the given C{args} and C{kwargs} raises C{exception}. The failure will report the traceback and call stack of the unexpected exception.
@param exception: exception type that is to be expected @param f: the function to call
@return: The raised exception instance, if it is of the given type. @raise self.failureException: Raised if the function call does not raise an exception or if it raises an exception of a different type. """ try: result = f(*args, **kwargs) except exception, inst: return inst except: raise self.failureException('%s raised instead of %s:\n %s' % (sys.exc_info()[0], exception.__name__, failure.Failure().getTraceback())) else: raise self.failureException('%s not raised (%r returned)' % (exception.__name__, result)) assertRaises = failUnlessRaises
def failUnlessEqual(self, first, second, msg=''): """ Fail the test if C{first} and C{second} are not equal.
@param msg: A string describing the failure that's included in the exception. """ if not first == second: if msg is None: msg = '' if len(msg) > 0: msg += '\n' raise self.failureException( '%snot equal:\na = %s\nb = %s\n' % (msg, pformat(first), pformat(second))) return first assertEqual = assertEquals = failUnlessEquals = failUnlessEqual
def failUnlessIdentical(self, first, second, msg=None): """ Fail the test if C{first} is not C{second}. This is an obect-identity-equality test, not an object equality (i.e. C{__eq__}) test.
@param msg: if msg is None, then the failure message will be '%r is not %r' % (first, second) """ if first is not second: raise self.failureException(msg or '%r is not %r' % (first, second)) return first assertIdentical = failUnlessIdentical
def failIfIdentical(self, first, second, msg=None): """ Fail the test if C{first} is C{second}. This is an obect-identity-equality test, not an object equality (i.e. C{__eq__}) test.
@param msg: if msg is None, then the failure message will be '%r is %r' % (first, second) """ if first is second: raise self.failureException(msg or '%r is %r' % (first, second)) return first assertNotIdentical = failIfIdentical
def failIfEqual(self, first, second, msg=None): """ Fail the test if C{first} == C{second}.
@param msg: if msg is None, then the failure message will be '%r == %r' % (first, second) """ if not first != second: raise self.failureException(msg or '%r == %r' % (first, second)) return first assertNotEqual = assertNotEquals = failIfEquals = failIfEqual
def failUnlessIn(self, containee, container, msg=None): """ Fail the test if C{containee} is not found in C{container}.
@param containee: the value that should be in C{container} @param container: a sequence type, or in the case of a mapping type, will follow semantics of 'if key in dict.keys()' @param msg: if msg is None, then the failure message will be '%r not in %r' % (first, second) """ if containee not in container: raise self.failureException(msg or "%r not in %r" % (containee, container)) return containee assertIn = failUnlessIn
def failIfIn(self, containee, container, msg=None): """ Fail the test if C{containee} is found in C{container}.
@param containee: the value that should not be in C{container} @param container: a sequence type, or in the case of a mapping type, will follow semantics of 'if key in dict.keys()' @param msg: if msg is None, then the failure message will be '%r in %r' % (first, second) """ if containee in container: raise self.failureException(msg or "%r in %r" % (containee, container)) return containee assertNotIn = failIfIn
def failIfAlmostEqual(self, first, second, places=7, msg=None): """ Fail if the two objects are equal as determined by their difference rounded to the given number of decimal places (default 7) and comparing to zero.
@note: decimal places (from zero) is usually not the same as significant digits (measured from the most signficant digit).
@note: included for compatiblity with PyUnit test cases """ if round(second-first, places) == 0: raise self.failureException(msg or '%r == %r within %r places' % (first, second, places)) return first assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual failIfAlmostEquals = failIfAlmostEqual
def failUnlessAlmostEqual(self, first, second, places=7, msg=None): """ Fail if the two objects are unequal as determined by their difference rounded to the given number of decimal places (default 7) and comparing to zero.
@note: decimal places (from zero) is usually not the same as significant digits (measured from the most signficant digit).
@note: included for compatiblity with PyUnit test cases """ if round(second-first, places) != 0: raise self.failureException(msg or '%r != %r within %r places' % (first, second, places)) return first assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual failUnlessAlmostEquals = failUnlessAlmostEqual
def failUnlessApproximates(self, first, second, tolerance, msg=None): """ Fail if C{first} - C{second} > C{tolerance}
@param msg: if msg is None, then the failure message will be '%r ~== %r' % (first, second) """ if abs(first - second) > tolerance: raise self.failureException(msg or "%s ~== %s" % (first, second)) return first assertApproximates = failUnlessApproximates
def failUnlessFailure(self, deferred, *expectedFailures): """ Fail if C{deferred} does not errback with one of C{expectedFailures}. Returns the original Deferred with callbacks added. You will need to return this Deferred from your test case. """ def _cb(ignore): raise self.failureException( "did not catch an error, instead got %r" % (ignore,))
def _eb(failure): if failure.check(*expectedFailures): return failure.value else: output = ('\nExpected: %r\nGot:\n%s' % (expectedFailures, str(failure))) raise self.failureException(output) return deferred.addCallbacks(_cb, _eb) assertFailure = failUnlessFailure
def failUnlessSubstring(self, substring, astring, msg=None): """ Fail if C{substring} does not exist within C{astring}. """ return self.failUnlessIn(substring, astring, msg) assertSubstring = failUnlessSubstring
def failIfSubstring(self, substring, astring, msg=None): """ Fail if C{astring} contains C{substring}. """ return self.failIfIn(substring, astring, msg) assertNotSubstring = failIfSubstring
def failUnlessWarns(self, category, message, filename, f, *args, **kwargs): """ Fail if the given function doesn't generate the specified warning when called. It calls the function, checks the warning, and forwards the result of the function if everything is fine.
@param category: the category of the warning to check. @param message: the output message of the warning to check. @param filename: the filename where the warning should come from. @param f: the function which is supposed to generate the warning. @type f: any callable. @param args: the arguments to C{f}. @param kwargs: the keywords arguments to C{f}.
@return: the result of the original function C{f}. """ warningsShown = [] result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
if not warningsShown: self.fail("No warnings emitted") first = warningsShown[0] for other in warningsShown[1:]: if ((other.message, other.category) != (first.message, first.category)): self.fail("Can't handle different warnings") self.assertEqual(first.message, message) self.assertIdentical(first.category, category)
# Use starts with because of .pyc/.pyo issues. self.failUnless( filename.startswith(first.filename), 'Warning in %r, expected %r' % (first.filename, filename))
# It would be nice to be able to check the line number as well, but # different configurations actually end up reporting different line # numbers (generally the variation is only 1 line, but that's enough # to fail the test erroneously...). # self.assertEqual(lineno, xxx)
return result assertWarns = failUnlessWarns
def failUnlessIsInstance(self, instance, classOrTuple): """ Fail if C{instance} is not an instance of the given class or of one of the given classes.
@param instance: the object to test the type (first argument of the C{isinstance} call). @type instance: any. @param classOrTuple: the class or classes to test against (second argument of the C{isinstance} call). @type classOrTuple: class, type, or tuple. """ if not isinstance(instance, classOrTuple): self.fail("%r is not an instance of %s" % (instance, classOrTuple)) assertIsInstance = failUnlessIsInstance
def failIfIsInstance(self, instance, classOrTuple): """ Fail if C{instance} is not an instance of the given class or of one of the given classes.
@param instance: the object to test the type (first argument of the C{isinstance} call). @type instance: any. @param classOrTuple: the class or classes to test against (second argument of the C{isinstance} call). @type classOrTuple: class, type, or tuple. """ if isinstance(instance, classOrTuple): self.fail("%r is an instance of %s" % (instance, classOrTuple)) assertNotIsInstance = failIfIsInstance
class _LogObserver(object): """ Observes the Twisted logs and catches any errors.
@ivar _errors: A C{list} of L{Failure} instances which were received as error events from the Twisted logging system.
@ivar _added: A C{int} giving the number of times C{_add} has been called less the number of times C{_remove} has been called; used to only add this observer to the Twisted logging since once, regardless of the number of calls to the add method.
@ivar _ignored: A C{list} of exception types which will not be recorded. """
def __init__(self): self._errors = [] self._added = 0 self._ignored = []
def _add(self): if self._added == 0: log.addObserver(self.gotEvent) self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors) self._oldIE, log._ignore = (log._ignore, self._ignoreErrors) self._oldCI, log._clearIgnores = (log._clearIgnores, self._clearIgnores) self._added += 1
def _remove(self): self._added -= 1 if self._added == 0: log.removeObserver(self.gotEvent) log._flushErrors = self._oldFE log._ignore = self._oldIE log._clearIgnores = self._oldCI
def _ignoreErrors(self, *errorTypes): """ Do not store any errors with any of the given types. """ self._ignored.extend(errorTypes)
def _clearIgnores(self): """ Stop ignoring any errors we might currently be ignoring. """ self._ignored = []
def flushErrors(self, *errorTypes): """ Flush errors from the list of caught errors. If no arguments are specified, remove all errors. If arguments are specified, only remove errors of those types from the stored list. """ if errorTypes: flushed = [] remainder = [] for f in self._errors: if f.check(*errorTypes): flushed.append(f) else: remainder.append(f) self._errors = remainder else: flushed = self._errors self._errors = [] return flushed
def getErrors(self): """ Return a list of errors caught by this observer. """ return self._errors
def gotEvent(self, event): """ The actual observer method. Called whenever a message is logged.
@param event: A dictionary containing the log message. Actual structure undocumented (see source for L{twisted.python.log}). """ if event.get('isError', False) and 'failure' in event: f = event['failure'] if len(self._ignored) == 0 or not f.check(*self._ignored): self._errors.append(f)
_logObserver = _LogObserver()
_wait_is_running = []
class TestCase(_Assertions): """ A unit test. The atom of the unit testing universe.
This class extends C{unittest.TestCase} from the standard library. The main feature is the ability to return C{Deferred}s from tests and fixture methods and to have the suite wait for those C{Deferred}s to fire.
To write a unit test, subclass C{TestCase} and define a method (say, 'test_foo') on the subclass. To run the test, instantiate your subclass with the name of the method, and call L{run} on the instance, passing a L{TestResult} object.
The C{trial} script will automatically find any C{TestCase} subclasses defined in modules beginning with 'test_' and construct test cases for all methods beginning with 'test'.
If an error is logged during the test run, the test will fail with an error. See L{log.err}.
@ivar failureException: An exception class, defaulting to C{FailTest}. If the test method raises this exception, it will be reported as a failure, rather than an exception. All of the assertion methods raise this if the assertion fails.
@ivar skip: C{None} or a string explaining why this test is to be skipped. If defined, the test will not be run. Instead, it will be reported to the result object as 'skipped' (if the C{TestResult} supports skipping).
@ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be passed to C{warnings.filterwarnings}. Use these to suppress warnings raised in a test. Useful for testing deprecated code. See also L{util.suppress}.
@ivar timeout: A real number of seconds. If set, the test will raise an error if it takes longer than C{timeout} seconds. If not set, util.DEFAULT_TIMEOUT_DURATION is used.
@ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where C{errors} is either an exception class or an iterable of exception classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more information. """
implements(itrial.ITestCase) failureException = FailTest
def __init__(self, methodName='runTest'): """ Construct an asynchronous test case for C{methodName}.
@param methodName: The name of a method on C{self}. This method should be a unit test. That is, it should be a short method that calls some of the assert* methods. If C{methodName} is unspecified, L{runTest} will be used as the test method. This is mostly useful for testing Trial. """ super(TestCase, self).__init__(methodName) self._testMethodName = methodName testMethod = getattr(self, methodName) self._parents = [testMethod, self] self._parents.extend(util.getPythonContainers(testMethod)) self._passed = False self._cleanups = []
if sys.version_info >= (2, 6): # Override the comparison defined by the base TestCase which considers # instances of the same class with the same _testMethodName to be # equal. Since trial puts TestCase instances into a set, that # definition of comparison makes it impossible to run the same test # method twice. Most likely, trial should stop using a set to hold # tests, but until it does, this is necessary on Python 2.6. Only # __eq__ and __ne__ are required here, not __hash__, since the # inherited __hash__ is compatible with these equality semantics. A # different __hash__ might be slightly more efficient (by reducing # collisions), but who cares? -exarkun def __eq__(self, other): return self is other
def __ne__(self, other): return self is not other
def _run(self, methodName, result): from twisted.internet import reactor timeout = self.getTimeout() def onTimeout(d): e = defer.TimeoutError("%r (%s) still running at %s secs" % (self, methodName, timeout)) f = failure.Failure(e) # try to errback the deferred that the test returns (for no gorram # reason) (see issue1005 and test_errorPropagation in # test_deferred) try: d.errback(f) except defer.AlreadyCalledError: # if the deferred has been called already but the *back chain # is still unfinished, crash the reactor and report timeout # error ourself. reactor.crash() self._timedOut = True # see self._wait todo = self.getTodo() if todo is not None and todo.expected(f): result.addExpectedFailure(self, f, todo) else: result.addError(self, f) onTimeout = utils.suppressWarnings( onTimeout, util.suppress(category=DeprecationWarning)) method = getattr(self, methodName) d = defer.maybeDeferred(utils.runWithWarningsSuppressed, self.getSuppress(), method) call = reactor.callLater(timeout, onTimeout, d) d.addBoth(lambda x : call.active() and call.cancel() or x) return d
def shortDescription(self): desc = super(TestCase, self).shortDescription() if desc is None: return self._testMethodName return desc
def __call__(self, *args, **kwargs): return self.run(*args, **kwargs)
def deferSetUp(self, ignored, result): d = self._run('setUp', result) d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp, callbackArgs=(result,), errbackArgs=(result,)) return d
def _ebDeferSetUp(self, failure, result): if failure.check(SkipTest): result.addSkip(self, self._getReason(failure)) else: result.addError(self, failure) if failure.check(KeyboardInterrupt): result.stop() return self.deferRunCleanups(None, result)
def deferTestMethod(self, ignored, result): d = self._run(self._testMethodName, result) d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod, callbackArgs=(result,), errbackArgs=(result,)) d.addBoth(self.deferRunCleanups, result) d.addBoth(self.deferTearDown, result) return d
def _cbDeferTestMethod(self, ignored, result): if self.getTodo() is not None: result.addUnexpectedSuccess(self, self.getTodo()) else: self._passed = True return ignored
def _ebDeferTestMethod(self, f, result): todo = self.getTodo() if todo is not None and todo.expected(f): result.addExpectedFailure(self, f, todo) elif f.check(self.failureException, FailTest): result.addFailure(self, f) elif f.check(KeyboardInterrupt): result.addError(self, f) result.stop() elif f.check(SkipTest): result.addSkip(self, self._getReason(f)) else: result.addError(self, f)
def deferTearDown(self, ignored, result): d = self._run('tearDown', result) d.addErrback(self._ebDeferTearDown, result) return d
def _ebDeferTearDown(self, failure, result): result.addError(self, failure) if failure.check(KeyboardInterrupt): result.stop() self._passed = False
def deferRunCleanups(self, ignored, result): """ Run any scheduled cleanups and report errors (if any to the result object. """ d = self._runCleanups() d.addCallback(self._cbDeferRunCleanups, result) return d
def _cbDeferRunCleanups(self, cleanupResults, result): for flag, failure in cleanupResults: if flag == defer.FAILURE: result.addError(self, failure) if failure.check(KeyboardInterrupt): result.stop() self._passed = False
def _cleanUp(self, result): try: clean = util._Janitor(self, result).postCaseCleanup() if not clean: self._passed = False except: result.addError(self, failure.Failure()) self._passed = False for error in self._observer.getErrors(): result.addError(self, error) self._passed = False self.flushLoggedErrors() self._removeObserver() if self._passed: result.addSuccess(self)
def _classCleanUp(self, result): try: util._Janitor(self, result).postClassCleanup() except: result.addError(self, failure.Failure())
def _makeReactorMethod(self, name): """ Create a method which wraps the reactor method C{name}. The new method issues a deprecation warning and calls the original. """ def _(*a, **kw): warnings.warn("reactor.%s cannot be used inside unit tests. " "In the future, using %s will fail the test and may " "crash or hang the test run." % (name, name), stacklevel=2, category=DeprecationWarning) return self._reactorMethods[name](*a, **kw) return _
def _deprecateReactor(self, reactor): """ Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is, each method is wrapped in a function that issues a deprecation warning, then calls the original.
@param reactor: The Twisted reactor. """ self._reactorMethods = {} for name in ['crash', 'iterate', 'stop']: self._reactorMethods[name] = getattr(reactor, name) setattr(reactor, name, self._makeReactorMethod(name))
def _undeprecateReactor(self, reactor): """ Restore the deprecated reactor methods. Undoes what L{_deprecateReactor} did.
@param reactor: The Twisted reactor. """ for name, method in self._reactorMethods.iteritems(): setattr(reactor, name, method) self._reactorMethods = {}
def _installObserver(self): self._observer = _logObserver self._observer._add()
def _removeObserver(self): self._observer._remove()
def flushLoggedErrors(self, *errorTypes): """ Remove stored errors received from the log.
C{TestCase} stores each error logged during the run of the test and reports them as errors during the cleanup phase (after C{tearDown}).
@param *errorTypes: If unspecifed, flush all errors. Otherwise, only flush errors that match the given types.
@return: A list of failures that have been removed. """ return self._observer.flushErrors(*errorTypes)
def flushWarnings(self, offendingFunctions=None): """ Remove stored warnings from the list of captured warnings and return them.
@param offendingFunctions: If C{None}, all warnings issued during the currently running test will be flushed. Otherwise, only warnings which I{point} to a function included in this list will be flushed. All warnings include a filename and source line number; if these parts of a warning point to a source line which is part of a function, then the warning I{points} to that function. @type offendingFunctions: L{NoneType} or L{list} of functions or methods.
@raise ValueError: If C{offendingFunctions} is not C{None} and includes an object which is not a L{FunctionType} or L{MethodType} instance.
@return: A C{list}, each element of which is a C{dict} giving information about one warning which was flushed by this call. The keys of each C{dict} are:
- C{'message'}: The string which was passed as the I{message} parameter to L{warnings.warn}.
- C{'category'}: The warning subclass which was passed as the I{category} parameter to L{warnings.warn}.
- C{'filename'}: The name of the file containing the definition of the code object which was C{stacklevel} frames above the call to L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} parameter passed to L{warnings.warn}.
- C{'lineno'}: The source line associated with the active instruction of the code object object which was C{stacklevel} frames above the call to L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel} parameter passed to L{warnings.warn}. """ if offendingFunctions is None: toFlush = self._warnings[:] self._warnings[:] = [] else: toFlush = [] for aWarning in self._warnings: for aFunction in offendingFunctions: if not isinstance(aFunction, ( types.FunctionType, types.MethodType)): raise ValueError("%r is not a function or method" % ( aFunction,))
# inspect.getabsfile(aFunction) sometimes returns a # filename which disagrees with the filename the warning # system generates. This seems to be because a # function's code object doesn't deal with source files # being renamed. inspect.getabsfile(module) seems # better (or at least agrees with the warning system # more often), and does some normalization for us which # is desirable. inspect.getmodule() is attractive, but # somewhat broken in Python 2.3. See Python bug 4845. aModule = sys.modules[aFunction.__module__] filename = inspect.getabsfile(aModule)
if filename != os.path.normcase(aWarning.filename): continue lineStarts = list(_findlinestarts(aFunction.func_code)) first = lineStarts[0][1] last = lineStarts[-1][1] if not (first <= aWarning.lineno <= last): continue # The warning points to this function, flush it and move on # to the next warning. toFlush.append(aWarning) break # Remove everything which is being flushed. map(self._warnings.remove, toFlush)
return [ {'message': w.message, 'category': w.category, 'filename': w.filename, 'lineno': w.lineno} for w in toFlush]
def addCleanup(self, f, *args, **kwargs): """ Add the given function to a list of functions to be called after the test has run, but before C{tearDown}.
Functions will be run in reverse order of being added. This helps ensure that tear down complements set up.
The function C{f} may return a Deferred. If so, C{TestCase} will wait until the Deferred has fired before proceeding to the next function. """ self._cleanups.append((f, args, kwargs))
def callDeprecated(self, version, f, *args, **kwargs): """ Call a function that was deprecated at a specific version.
@param version: The version that the function was deprecated in. @param f: The deprecated function to call. @return: Whatever the function returns. """ result = f(*args, **kwargs) warningsShown = self.flushWarnings([self.callDeprecated])
if len(warningsShown) == 0: self.fail('%r is not deprecated.' % (f,))
observedWarning = warningsShown[0]['message'] expectedWarning = getDeprecationWarningString(f, version) self.assertEqual(expectedWarning, observedWarning)
return result
def _runCleanups(self): """ Run the cleanups added with L{addCleanup} in order.
@return: A C{Deferred} that fires when all cleanups are run. """ def _makeFunction(f, args, kwargs): return lambda: f(*args, **kwargs) callables = [] while len(self._cleanups) > 0: f, args, kwargs = self._cleanups.pop() callables.append(_makeFunction(f, args, kwargs)) return util._runSequentially(callables)
def patch(self, obj, attribute, value): """ Monkey patch an object for the duration of the test.
The monkey patch will be reverted at the end of the test using the L{addCleanup} mechanism.
The L{MonkeyPatcher} is returned so that users can restore and re-apply the monkey patch within their tests.
@param obj: The object to monkey patch. @param attribute: The name of the attribute to change. @param value: The value to set the attribute to. @return: A L{monkey.MonkeyPatcher} object. """ monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value)) monkeyPatch.patch() self.addCleanup(monkeyPatch.restore) return monkeyPatch
def runTest(self): """ If no C{methodName} argument is passed to the constructor, L{run} will treat this method as the thing with the actual test inside. """
def run(self, result): """ Run the test case, storing the results in C{result}.
First runs C{setUp} on self, then runs the test method (defined in the constructor), then runs C{tearDown}. Any of these may return L{Deferred}s. After they complete, does some reactor cleanup.
@param result: A L{TestResult} object. """ log.msg("--> %s <--" % (self.id())) from twisted.internet import reactor new_result = itrial.IReporter(result, None) if new_result is None: result = PyUnitResultAdapter(result) else: result = new_result self._timedOut = False result.startTest(self) if self.getSkip(): # don't run test methods that are marked as .skip result.addSkip(self, self.getSkip()) result.stopTest(self) return self._installObserver()
# All the code inside runThunk will be run such that warnings emitted # by it will be collected and retrievable by flushWarnings. def runThunk(): self._passed = False self._deprecateReactor(reactor) try: d = self.deferSetUp(None, result) try: self._wait(d) finally: self._cleanUp(result) self._classCleanUp(result) finally: self._undeprecateReactor(reactor)
self._warnings = [] _collectWarnings(self._warnings.append, runThunk)
# Any collected warnings which the test method didn't flush get # re-emitted so they'll be logged or show up on stdout or whatever. for w in self.flushWarnings(): try: warnings.warn_explicit(**w) except: result.addError(self, failure.Failure())
result.stopTest(self)
def _getReason(self, f): if len(f.value.args) > 0: reason = f.value.args[0] else: warnings.warn(("Do not raise unittest.SkipTest with no " "arguments! Give a reason for skipping tests!"), stacklevel=2) reason = f return reason
def getSkip(self): """ Return the skip reason set on this test, if any is set. Checks on the instance first, then the class, then the module, then packages. As soon as it finds something with a C{skip} attribute, returns that. Returns C{None} if it cannot find anything. See L{TestCase} docstring for more details. """ return util.acquireAttribute(self._parents, 'skip', None)
def getTodo(self): """ Return a L{Todo} object if the test is marked todo. Checks on the instance first, then the class, then the module, then packages. As soon as it finds something with a C{todo} attribute, returns that. Returns C{None} if it cannot find anything. See L{TestCase} docstring for more details. """ todo = util.acquireAttribute(self._parents, 'todo', None) if todo is None: return None return makeTodo(todo)
def getTimeout(self): """ Returns the timeout value set on this test. Checks on the instance first, then the class, then the module, then packages. As soon as it finds something with a C{timeout} attribute, returns that. Returns L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See L{TestCase} docstring for more details. """ timeout = util.acquireAttribute(self._parents, 'timeout', util.DEFAULT_TIMEOUT_DURATION) try: return float(timeout) except (ValueError, TypeError): # XXX -- this is here because sometimes people will have methods # called 'timeout', or set timeout to 'orange', or something # Particularly, test_news.NewsTestCase and ReactorCoreTestCase # both do this. warnings.warn("'timeout' attribute needs to be a number.", category=DeprecationWarning) return util.DEFAULT_TIMEOUT_DURATION
def getSuppress(self): """ Returns any warning suppressions set for this test. Checks on the instance first, then the class, then the module, then packages. As soon as it finds something with a C{suppress} attribute, returns that. Returns any empty list (i.e. suppress no warnings) if it cannot find anything. See L{TestCase} docstring for more details. """ return util.acquireAttribute(self._parents, 'suppress', [])
def visit(self, visitor): """ Visit this test case. Call C{visitor} with C{self} as a parameter.
Deprecated in Twisted 8.0.
@param visitor: A callable which expects a single parameter: a test case.
@return: None """ warnings.warn("Test visitors deprecated in Twisted 8.0", category=DeprecationWarning) visitor(self)
def mktemp(self): """Returns a unique name that may be used as either a temporary directory or filename.
@note: you must call os.mkdir on the value returned from this method if you wish to use it as a directory! """ MAX_FILENAME = 32 # some platforms limit lengths of filenames base = os.path.join(self.__class__.__module__[:MAX_FILENAME], self.__class__.__name__[:MAX_FILENAME], self._testMethodName[:MAX_FILENAME]) if not os.path.exists(base): os.makedirs(base) dirname = tempfile.mkdtemp('', '', base) return os.path.join(dirname, 'temp')
def _wait(self, d, running=_wait_is_running): """Take a Deferred that only ever callbacks. Block until it happens. """ from twisted.internet import reactor if running: raise RuntimeError("_wait is not reentrant")
results = [] def append(any): if results is not None: results.append(any) def crash(ign): if results is not None: reactor.crash() crash = utils.suppressWarnings( crash, util.suppress(message=r'reactor\.crash cannot be used.*', category=DeprecationWarning)) def stop(): reactor.crash() stop = utils.suppressWarnings( stop, util.suppress(message=r'reactor\.crash cannot be used.*', category=DeprecationWarning))
running.append(None) try: d.addBoth(append) if results: # d might have already been fired, in which case append is # called synchronously. Avoid any reactor stuff. return d.addBoth(crash) reactor.stop = stop try: reactor.run() finally: del reactor.stop
# If the reactor was crashed elsewhere due to a timeout, hopefully # that crasher also reported an error. Just return. # _timedOut is most likely to be set when d has fired but hasn't # completed its callback chain (see self._run) if results or self._timedOut: #defined in run() and _run() return
# If the timeout didn't happen, and we didn't get a result or # a failure, then the user probably aborted the test, so let's # just raise KeyboardInterrupt.
# FIXME: imagine this: # web/test/test_webclient.py: # exc = self.assertRaises(error.Error, wait, method(url)) # # wait() will raise KeyboardInterrupt, and assertRaises will # swallow it. Therefore, wait() raising KeyboardInterrupt is # insufficient to stop trial. A suggested solution is to have # this code set a "stop trial" flag, or otherwise notify trial # that it should really try to stop as soon as possible. raise KeyboardInterrupt() finally: results = None running.pop()
class UnsupportedTrialFeature(Exception): """A feature of twisted.trial was used that pyunit cannot support."""
class PyUnitResultAdapter(object): """ Wrap a C{TestResult} from the standard library's C{unittest} so that it supports the extended result types from Trial, and also supports L{twisted.python.failure.Failure}s being passed to L{addError} and L{addFailure}. """
def __init__(self, original): """ @param original: A C{TestResult} instance from C{unittest}. """ self.original = original
def _exc_info(self, err): return util.excInfoOrFailureToExcInfo(err)
def startTest(self, method): self.original.startTest(method)
def stopTest(self, method): self.original.stopTest(method)
def addFailure(self, test, fail): self.original.addFailure(test, self._exc_info(fail))
def addError(self, test, error): self.original.addError(test, self._exc_info(error))
def _unsupported(self, test, feature, info): self.original.addFailure( test, (UnsupportedTrialFeature, UnsupportedTrialFeature(feature, info), None))
def addSkip(self, test, reason): """ Report the skip as a failure. """ self._unsupported(test, 'skip', reason)
def addUnexpectedSuccess(self, test, todo): """ Report the unexpected success as a failure. """ self._unsupported(test, 'unexpected success', todo)
def addExpectedFailure(self, test, error): """ Report the expected failure (i.e. todo) as a failure. """ self._unsupported(test, 'expected failure', error)
def addSuccess(self, test): self.original.addSuccess(test)
def upDownError(self, method, error, warn, printStatus): pass
def suiteVisit(suite, visitor): """ Visit each test in C{suite} with C{visitor}.
Deprecated in Twisted 8.0.
@param visitor: A callable which takes a single argument, the L{TestCase} instance to visit. @return: None """ warnings.warn("Test visitors deprecated in Twisted 8.0", category=DeprecationWarning) for case in suite._tests: visit = getattr(case, 'visit', None) if visit is not None: visit(visitor) elif isinstance(case, pyunit.TestCase): case = itrial.ITestCase(case) case.visit(visitor) elif isinstance(case, pyunit.TestSuite): suiteVisit(case, visitor) else: case.visit(visitor)
class TestSuite(pyunit.TestSuite): """ Extend the standard library's C{TestSuite} with support for the visitor pattern and a consistently overrideable C{run} method. """
visit = suiteVisit
def __call__(self, result): return self.run(result)
def run(self, result): """ Call C{run} on every member of the suite. """ # we implement this because Python 2.3 unittest defines this code # in __call__, whereas 2.4 defines the code in run. for test in self._tests: if result.shouldStop: break test(result) return result
class TestDecorator(components.proxyForInterface(itrial.ITestCase, "_originalTest")): """ Decorator for test cases.
@param _originalTest: The wrapped instance of test. @type _originalTest: A provider of L{itrial.ITestCase} """
implements(itrial.ITestCase)
def __call__(self, result): """ Run the unit test.
@param result: A TestResult object. """ return self.run(result)
def run(self, result): """ Run the unit test.
@param result: A TestResult object. """ return self._originalTest.run( reporter._AdaptedReporter(result, self.__class__))
def _clearSuite(suite): """ Clear all tests from C{suite}.
This messes with the internals of C{suite}. In particular, it assumes that the suite keeps all of its tests in a list in an instance variable called C{_tests}. """ suite._tests = []
def decorate(test, decorator): """ Decorate all test cases in C{test} with C{decorator}.
C{test} can be a test case or a test suite. If it is a test suite, then the structure of the suite is preserved.
L{decorate} tries to preserve the class of the test suites it finds, but assumes the presence of the C{_tests} attribute on the suite.
@param test: The C{TestCase} or C{TestSuite} to decorate.
@param decorator: A unary callable used to decorate C{TestCase}s.
@return: A decorated C{TestCase} or a C{TestSuite} containing decorated C{TestCase}s. """
try: tests = iter(test) except TypeError: return decorator(test)
# At this point, we know that 'test' is a test suite. _clearSuite(test)
for case in tests: test.addTest(decorate(case, decorator)) return test
class _PyUnitTestCaseAdapter(TestDecorator): """ Adapt from pyunit.TestCase to ITestCase. """
def visit(self, visitor): """ Deprecated in Twisted 8.0. """ warnings.warn("Test visitors deprecated in Twisted 8.0", category=DeprecationWarning) visitor(self)
class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter): """ Adapter for pyunit-style C{TestCase} subclasses that have undesirable id() methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}. """
def id(self): """ Return the fully-qualified Python name of the doctest. """ testID = self._originalTest.shortDescription() if testID is not None: return testID return self._originalTest.id()
class _ForceGarbageCollectionDecorator(TestDecorator): """ Forces garbage collection to be run before and after the test. Any errors logged during the post-test collection are added to the test result as errors. """
def run(self, result): gc.collect() TestDecorator.run(self, result) _logObserver._add() gc.collect() for error in _logObserver.getErrors(): result.addError(self, error) _logObserver.flushErrors() _logObserver._remove()
components.registerAdapter( _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase)
components.registerAdapter( _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase)
_docTestCase = getattr(doctest, 'DocTestCase', None) if _docTestCase: components.registerAdapter( _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase)
def _iterateTests(testSuiteOrCase): """ Iterate through all of the test cases in C{testSuiteOrCase}. """ try: suite = iter(testSuiteOrCase) except TypeError: yield testSuiteOrCase else: for test in suite: for subtest in _iterateTests(test): yield subtest
# Support for Python 2.3 try: iter(pyunit.TestSuite()) except TypeError: # Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it! def __iter__(self): return iter(self._tests) pyunit.TestSuite.__iter__ = __iter__
class _SubTestCase(TestCase): def __init__(self): TestCase.__init__(self, 'run')
_inst = _SubTestCase()
def _deprecate(name): """ Internal method used to deprecate top-level assertions. Do not use this. """ def _(*args, **kwargs): warnings.warn("unittest.%s is deprecated. Instead use the %r " "method on unittest.TestCase" % (name, name), stacklevel=2, category=DeprecationWarning) return getattr(_inst, name)(*args, **kwargs) return _
_assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals', 'failUnless', 'failUnlessIdentical', 'failUnlessIn', 'failIfIdentical', 'failIfIn', 'failIf', 'failUnlessAlmostEqual', 'failIfAlmostEqual', 'failUnlessRaises', 'assertApproximates', 'assertFailure', 'failUnlessSubstring', 'failIfSubstring', 'assertAlmostEqual', 'assertAlmostEquals', 'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual', 'assertEquals', 'assertNotEqual', 'assertNotEquals', 'assertRaises', 'assert_', 'assertIdentical', 'assertNotIdentical', 'assertIn', 'assertNotIn', 'failUnlessFailure', 'assertSubstring', 'assertNotSubstring']
for methodName in _assertions: globals()[methodName] = _deprecate(methodName)
__all__ = ['TestCase', 'FailTest', 'SkipTest']
|