Viewing file: test_athena.py (59.54 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import os, sets from itertools import izip from xml.dom.minidom import parseString
from twisted.trial import unittest from twisted.python import util from twisted.internet.defer import Deferred from twisted.application.service import IServiceMaker from twisted.application.internet import TCPServer from twisted.python.reflect import qual from twisted.python.usage import UsageError from twisted.plugin import IPlugin
from nevow import athena, rend, tags, flat, loaders, url from nevow.loaders import stan from nevow.athena import LiveElement from nevow.appserver import NevowSite from nevow.inevow import IRequest from nevow.context import WovenContext from nevow.testutil import FakeRequest, renderPage, renderLivePage, CSSModuleTestMixin from nevow._widget_plugin import WidgetPluginRoot from nevow._widget_plugin import ElementRenderingLivePage
from twisted.plugins.nevow_widget import widgetServiceMaker
class MappingResourceTests(unittest.TestCase): """ Tests for L{athena.MappingResource}. """ def test_renderMapping(self): """ L{athena.MappingResource} isn't directly renderable. """ m = athena.MappingResource({}) self.failUnless(isinstance(m.renderHTTP(None), rend.FourOhFour))
def test_lookupNonExistentKey(self): """ L{athena.MappingResource} should return L{rend.NotFound} when asked for a non-existent key. """ m = athena.MappingResource({'name': 'value'}) self.assertEquals(m.locateChild(None, ('key',)), rend.NotFound)
def test_lookupKey(self): """ L{athena.MappingResource} should return whatever the C{resourceFactory} method products when supplied a valid key. """ m = athena.MappingResource({'name': 'value'}) m.resourceFactory = sets.Set resource, segments = m.locateChild(None, ('name',)) self.assertEquals(segments, []) self.assertEquals(resource, sets.Set('value'))
class ModuleRegistryTestMixin: """ Mixin for testing module registry objects. """ def test_getModuleForName(self): """ C{getModuleForName} should return the right kind of module. """ moduleName = u'test_getModuleForName' mapping = {moduleName: self.mktemp()} reg = self.registryClass(mapping) mod = reg.getModuleForName(moduleName) self.assertTrue(isinstance(mod, self.moduleClass)) self.assertEqual(mod.name, moduleName) self.assertIdentical(mod.mapping, mapping)
def test_getModuleForNameUnknown(self): """ C{getModuleForName} should get angry if we ask for a module which doesn't exist. """ moduleName = u'test_getModuleForName' reg = self.registryClass({}) self.assertRaises( RuntimeError, reg.getModuleForName, moduleName)
class CSSRegistryTests(unittest.TestCase, ModuleRegistryTestMixin): """ Tests for L{athena.CSSRegistry}. """ registryClass = athena.CSSRegistry moduleClass = athena.CSSModule
def test_getModuleForNameLoad(self): """ L{athena.CSSRegistry} should initialize its mapping from L{athena.allCSSPackages} as needed. """ moduleName = u'test_getModuleForNameLoad' origAllCSSPackages = athena.allCSSPackages theCSSPackages = {moduleName: self.mktemp()} athena.allCSSPackages = lambda: theCSSPackages reg = athena.CSSRegistry() try: mod = reg.getModuleForName(moduleName) finally: athena.allCSSPackages = origAllCSSPackages self.assertEqual(mod.name, moduleName) self.assertEqual(mod.mapping, theCSSPackages)
class JSDependenciesTests(unittest.TestCase, ModuleRegistryTestMixin): """ Tests for L{athena.JSDependencies}. """ registryClass = athena.JSDependencies moduleClass = athena.JSModule
def test_getModuleForNameLoad(self): """ L{athena.JSDependencies} should initialize its mapping from L{athena.allCSSPackages} as needed. """ moduleName = u'test_getModuleForNameLoad' origAllJavascriptPackages = athena.allJavascriptPackages theJavascriptPackages = {moduleName: self.mktemp()} athena.allJavascriptPackages = lambda: theJavascriptPackages reg = athena.JSDependencies() try: mod = reg.getModuleForName(moduleName) finally: athena.allJavascriptPackages = origAllJavascriptPackages self.assertEqual(mod.name, moduleName) self.assertEqual(mod.mapping, theJavascriptPackages)
class AthenaModuleTestMixin: """ Mixin for testing L{athena.AthenaModule} and derived classes. """ testModuleImpl = """\ lalal this is javascript honest // uh oh! a comment! gee I wish javascript had an import system // import ExampleModule here is some more javascript code // import Another // import Module the end """ moduleClass = athena.AthenaModule
def setUp(self): """ Write L{testModuleImpl} to a file. """ self.testModuleFilename = self.mktemp() testModule = file(self.testModuleFilename, 'w') testModule.write(self.testModuleImpl) testModule.close()
def test_getOrCreate(self): """ L{athena.AthenaModule.getOrCreate} shouldn't make two instances of the same module. """ modules = {'testmodule': self.testModuleFilename} m1 = self.moduleClass.getOrCreate('testmodule', modules) m2 = self.moduleClass.getOrCreate('testmodule', modules)
self.assertTrue(isinstance(m1, self.moduleClass)) self.assertEquals(m1.name, 'testmodule')
self.assertIdentical(m1, m2)
def _doDependencySetup(self): """ Create a complicated network of module dependencies. """ emptyModulePath = self.mktemp() file(emptyModulePath, 'w').close() modules = { 'testmodule': self.testModuleFilename, 'Another': self.mktemp(), 'ExampleModule': self.mktemp(), 'Module': emptyModulePath, 'SecondaryDependency': emptyModulePath, 'ExampleDependency': emptyModulePath}
anotherModule = file(modules['Another'], 'w') anotherModule.write('// import SecondaryDependency\n') anotherModule.close()
exampleModule = file(modules['ExampleModule'], 'w') exampleModule.write('// import ExampleDependency\n') exampleModule.close()
return modules
def test_dependencies(self): """ L{athena.AthenaModule.dependencies} should return the direct dependencies of the module. """ modules = self._doDependencySetup() m = self.moduleClass.getOrCreate('testmodule', modules) deps = [d.name for d in m.dependencies()] deps.sort() self.assertEquals(deps, ['Another', 'ExampleModule', 'Module'])
def test_allDependencies(self): """ L{athena.AthenaModule.allDependencies} should return all dependencies of the module. """ depgraph = { 'Another': ['SecondaryDependency'], 'ExampleModule': ['ExampleDependency'], 'Module': [], 'testmodule': ['Another', 'ExampleModule', 'Module'], 'SecondaryDependency': [], 'ExampleDependency': []}
modules = self._doDependencySetup() m = self.moduleClass.getOrCreate('testmodule', modules)
allDeps = [d.name for d in m.allDependencies()] for depMod in allDeps: modDeps = depgraph[depMod] for d in modDeps: # All dependencies should be loaded before the module # that depends upon them. self.assertIn(d, allDeps) self.assertIn(depMod, allDeps) self.failUnless(allDeps.index(d) < allDeps.index(depMod))
def test_crlfNewlines(self): """ L{athena.AthenaModule} should correctly ignore the CR after a module name when CR LF newlines are used in a JavaScript source file. """ fooModuleFilename = self.mktemp() fooModule = file(fooModuleFilename, 'wb') fooModule.write('// import Bar\r\n') fooModule.close() barModuleFilename = self.mktemp() barModule = file(barModuleFilename, 'wb') barModule.close()
modules = { 'Foo': fooModuleFilename, 'Bar': barModuleFilename} module = self.moduleClass('Foo', modules) fooDependencies = list(module.dependencies()) self.assertEqual(len(fooDependencies), 1) self.assertEqual(fooDependencies[0].name, u'Bar')
def test_dependencyCaching(self): """ L{athena.AthenaModule} should cache module dependencies. """ testModuleFilename = self.mktemp() testModule = file(testModuleFilename, 'w') testModule.write('') testModule.close()
modules = {'testmodule': testModuleFilename} m = self.moduleClass('testmodule', modules) m.extractCounter = 0 origExtractImports = m._extractImports def _extractImports(x): m.extractCounter += 1 return origExtractImports(x) m._extractImports = _extractImports
deps = list(m.dependencies()) self.assertEquals(m.extractCounter, 1)
deps2 = list(m.dependencies()) self.assertEquals(m.extractCounter, 1)
newTime = m.lastModified os.utime(testModuleFilename, (newTime + 1, newTime + 1)) deps3 = list(m.dependencies()) self.assertEquals(m.extractCounter, 2)
def test_packageDependencies(self): """ L{athena.AthenaModule} should include a module's package in its dependencies. """ modules = {u'Foo': self.mktemp(), u'Foo.Bar': self.mktemp()} file(modules[u'Foo'], 'wb').close() file(modules[u'Foo.Bar'], 'wb').close() foo = self.moduleClass.getOrCreate(u'Foo', modules) bar = self.moduleClass.getOrCreate(u'Foo.Bar', modules) self.assertIn(foo, bar.allDependencies())
def test_repr(self): """ L{athena.AthenaModule} should C{repr} to something helpful. """ moduleName = u'Foo.Bar' module = self.moduleClass( moduleName, {moduleName: self.mktemp()}) self.assertEqual( repr(module), '%s(%r)' % (self.moduleClass.__name__, moduleName))
class AthenaModuleTests(AthenaModuleTestMixin, unittest.TestCase): """ Tests for L{athena.AthenaModule}. """ moduleClass = athena.AthenaModule
class JSModuleTests(AthenaModuleTestMixin, unittest.TestCase): """ Tests for L{athena.JSModule}. """ moduleClass = athena.JSModule
class CSSModuleTests(AthenaModuleTestMixin, unittest.TestCase): """ Tests for L{athena.CSSModule}. """ moduleClass = athena.CSSModule
class _CountingAthenaModule(athena.AthenaModule): """ Instrumented version of L{athena.AthenaModule} for testing. """ count = 0
def dependencies(self): self.count += 1 return super(_CountingAthenaModule, self).dependencies()
class MemoizationTests(unittest.TestCase): """ Tests for dependency memoization. """ def _outputToTempFile(self, s): """ Write the contents of string C{s} to a tempfile and return the filename that was used
@param s: file contents @type s: C{str}
@return: filename @rtype: C{str} """ fname = self.mktemp() fObj = file(fname, 'w') fObj.write(s) fObj.close() return fname
def setUp(self): # AthenaModule keeps a global mapping of module names to module objects # in a class attribute; we overwrite this here (on # _CountingAthenaModule) to sure that all modules will be loaded for # every test run. _CountingAthenaModule._modules = {}
empty = self._outputToTempFile('') quux = self._outputToTempFile('// import Foo') top = self._outputToTempFile('// import Quux\n' '// import Quux2') self.modules = {'Top': top, 'Quux': quux, 'Quux2': quux, 'Foo': empty}
def test_noGlobalMemo(self): """ L{AthenaModule.allDependencies} with no memo argument will retrieve its own dependencies (via L{AthenaModule.dependencies}) exactly once per invocation. """ foo = _CountingAthenaModule.getOrCreate('Foo', self.modules) top = _CountingAthenaModule.getOrCreate('Top', self.modules)
self.assertEqual(top.count, 0) deps = list(top.allDependencies()) self.assertEqual(top.count, 1) deps = list(top.allDependencies()) self.assertEqual(top.count, 2)
def test_withGlobalMemo(self): """ Direct dependencies for a particular module should only be retrieved once across multiple C{allDependencies()} calls if a memo is reused. """ foo = _CountingAthenaModule.getOrCreate('Foo', self.modules) top = _CountingAthenaModule.getOrCreate('Top', self.modules)
memo = {} self.assertEqual(top.count, 0) deps = list(top.allDependencies(memo)) self.assertEqual(top.count, 1) deps = list(top.allDependencies(memo)) self.assertEqual(top.count, 1)
class ModuleInteractionTests(unittest.TestCase): """ Tests for JS/CSS module interactions. """ def test_separateModuleNamespace(self): """ L{athena.CSSModule} and L{athena.JSModule} should use separate module namespaces. """ cssModule = athena.CSSModule.getOrCreate( u'test_separateModuleNamespace', {u'test_separateModuleNamespace': self.mktemp()}) jsModule = athena.JSModule.getOrCreate( u'test_separateModuleNamespace', {u'test_separateModuleNamespace': self.mktemp()}) self.assertNotIdentical(cssModule, jsModule) self.assertTrue(isinstance(cssModule, athena.CSSModule)) self.assertTrue(isinstance(jsModule, athena.JSModule))
class _AutoPackageTestMixin: """ Mixin for testing L{athena.AutoJSPackage} and L{athena.AutoCSSPackage}. """ packageFactory = None moduleExtension = None
def test_package(self): """ L{packageFactory} should correctly construct its mapping from a filesystem package layout. """ packageDir = self.mktemp() os.makedirs(os.path.join(packageDir, 'Foo', 'Baz'))
def childPath(*a): path = os.path.join(packageDir, *a) file(path, 'w').close() return path
expected = { u'Foo': childPath('Foo', '__init__.' + self.moduleExtension), u'Foo.Bar': childPath('Foo', 'Bar.' + self.moduleExtension), u'Foo.Baz': util.sibpath(athena.__file__, 'empty-module.' + self.moduleExtension), u'Foo.Baz.Quux': childPath('Foo', 'Baz', 'Quux.' + self.moduleExtension)}
childPath('Foo', '.foo.' + self.moduleExtension) os.mkdir(os.path.join(packageDir, 'Foo', '.test')) childPath('Foo', '.test', 'Foo.' + self.moduleExtension) childPath('Foo', 'Bar.other') childPath('Foo', 'Zot.other')
package = self.packageFactory(packageDir) for module, path in expected.iteritems(): m = package.mapping.pop(module) self.assertEquals(m, path) self.assertEquals(package.mapping, {})
class AutoJSPackageTests(unittest.TestCase, _AutoPackageTestMixin): """ Tests for L{athena.AutoJSPackage}. """ packageFactory = athena.AutoJSPackage moduleExtension = 'js'
class AutoCSSPackageTests(unittest.TestCase, _AutoPackageTestMixin): """ Tests for L{athena.AutoCSSPackage}. """ packageFactory = athena.AutoCSSPackage moduleExtension = 'css'
class UtilitiesTests(unittest.TestCase): """ Tests for misc. Athena utilities. """ def test_preprocessorCollection(self): """ Test that preprocessors from all the base classes of an instance are found, and that a preprocessor instance attribute overrides all of these. """ a, b, c = object(), object(), object()
class Base(object): preprocessors = [a]
class OtherBase(object): preprocessors = [b]
class Derived(Base, OtherBase): preprocessors = [c]
inst = Derived() self.assertEqual( rend._getPreprocessors(inst), [a, b, c])
d = object() inst.preprocessors = [d] self.assertEqual( rend._getPreprocessors(inst), [d])
def test_handlerMacro(self): """ Test that the handler macro rewrites athena:handler nodes to the appropriate JavaScript. """ expectedOutput = ( 'return Nevow.Athena.Widget.handleEvent(' 'this, "onclick", "bar");') tag = tags.span[athena.handler(event='onclick', handler='bar')] mutated = athena._rewriteEventHandlerToAttribute(tag) output = flat.flatten(mutated) self.assertEquals( output, '<span onclick="' + expectedOutput + '"></span>')
def test_handlerMacroAgainstList(self): """ Macros need to be runnable on lists of things. Make sure the handler macro is. """ tag = ["hello", " ", "world"] self.assertEquals( athena._rewriteEventHandlerToAttribute(tag), tag)
def test_athenaIdRewriting(self): """ Test that IDs are correctly rewritten in id, for, and headers attributes. """ tag = [tags.label(_for='foo'), tags.input(id='foo'), tags.th(headers=''), tags.th(headers='foo'), tags.td(headers='foo bar'), tags.td(headers='foo bar baz')] element = athena.LiveElement(docFactory=loaders.stan(tag)) page = athena.LivePage(docFactory=loaders.stan(element)) element.setFragmentParent(page)
def _verifyRendering(result): self.assertIn('<input id="athenaid:%s-foo"' % (element._athenaID,), result) self.assertIn('<label for="athenaid:%s-foo"' % (element._athenaID,), result) self.assertIn('<th headers=""', result) self.assertIn('<th headers="athenaid:%s-foo"' % ( element._athenaID,), result) self.assertIn('<td headers="athenaid:%s-foo athenaid:%s-bar"' % ( element._athenaID, element._athenaID), result) self.assertIn('<td headers="athenaid:%s-foo athenaid:%s-bar athenaid:%s-baz"' % ( element._athenaID, element._athenaID, element._athenaID), result)
return renderLivePage(page).addCallback(_verifyRendering)
def test_elementPreprocessors(self): """ Make sure that LiveElements have their preprocessors applied to their document. """ preprocessed = []
tag = tags.span element = athena.LiveElement(docFactory=loaders.stan(tag)) page = athena.LivePage(docFactory=loaders.stan(element)) element.preprocessors = [preprocessed.append] element.setFragmentParent(page) renderDeferred = renderPage(page) def rendered(result): page.action_close(None) self.assertEquals(preprocessed, [[tag]]) renderDeferred.addCallback(rendered) return renderDeferred
def test_userAgentDetection(self): """ C{LivePage._supportedBrowser} should return True for User-Agent strings which are not known to be supported and False for those which are known to be unsupported. """ page = athena.LivePage() supported = ["Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.1.3)" " Gecko/20061201 Firefox/2.0.0.3 (Ubuntu-feisty)", "Mozilla/5.0 (Windows; U; Windows NT 5.2; sv-SE;" " rv:1.8.0.8) Gecko/20061025 Firefox 1.5.0.8", "Opera/9.20 (Windows NT 6.0; U; en)"] unsupported = ["Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en)" " AppleWebKit/418.9.1 (KHTML, like Gecko) Safari/419.3", "Opera/8.5 (Windows NT 6.0; U; en)", "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US;" " rv:1.7.10) Gecko/20050716 Firefox/1.0.6", "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT 5.0)"] for ua in supported: req = FakeRequest() req.received_headers['user-agent'] = ua self.assertTrue(page._supportedBrowser(req)) for ua in unsupported: req = FakeRequest() req.received_headers['user-agent'] = ua self.assertFalse(page._supportedBrowser(req))
def test_unsupportedBrowserPage(self): """ Test that unsupported browsers get told they're unsupported. """ ctx = WovenContext() page = athena.LivePage() req = FakeRequest() req.received_headers['user-agent'] = "Mozilla/4.0 (compatible; MSIE 2.0; Windows NT 5.1)" ctx.remember(req, IRequest) d = renderPage(page, reqFactory=lambda: req) d.addCallback( self.assertEqual, flat.flatten(page.unsupportedBrowserLoader)) return d
class StandardLibraryTestCase(unittest.TestCase): """ Test all the Nevow JavaScript "standard library" modules. """ def setUp(self): self.deps = athena.JSDependencies()
def _importTest(self, moduleName): mod = self.deps.getModuleForName(moduleName) inspect = [dep for dep in mod.allDependencies() if dep.name == moduleName] self.failUnless(inspect)
def test_divmodImport(self): """ Test that Divmod can be imported. """ return self._importTest('Divmod')
def test_baseImport(self): """ Test that Divmod.Base can be imported. """ return self._importTest('Divmod.Base')
def test_deferImport(self): """ Test that Divmod.Defer can be imported. """ return self._importTest('Divmod.Defer')
def test_inspectImport(self): """ Test that Divmod.Inspect can be imported. """ return self._importTest('Divmod.Inspect')
def test_runtimeImport(self): """ Test that Divmod.Runtime can be imported. """ return self._importTest('Divmod.Runtime')
def test_xmlImport(self): """ Test that Divmod.XML can be imported. """ return self._importTest('Divmod.XML')
def test_nevowImport(self): """ Test that Nevow can be imported. """ return self._importTest('Nevow')
def test_athenaImport(self): """ Test that Nevow.Athena can be imported. """ return self._importTest('Nevow.Athena')
def test_testImport(self): """ Test that Nevow.Athena can be imported. """ return self._importTest('Nevow.Athena.Test')
def test_tagLibraryImport(self): """ Test that Nevow.TagLibrary can be imported. """ return self._importTest('Nevow.TagLibrary')
def test_tabbedPaneImport(self): """ Test that Nevow.TagLibrary.TabbedPane can be imported. """ return self._importTest('Nevow.TagLibrary.TabbedPane')
class TestFragment(athena.LiveFragment): pass
class Nesting(unittest.TestCase):
def testFragmentNesting(self): lp = athena.LivePage() tf1 = TestFragment() tf2 = TestFragment()
tf1.setFragmentParent(lp) tf2.setFragmentParent(tf1)
self.assertEquals(lp.liveFragmentChildren, [tf1]) self.assertEquals(tf1.liveFragmentChildren, [tf2]) self.assertEquals(tf2.liveFragmentChildren, []) self.assertEquals(tf2.fragmentParent, tf1) self.assertEquals(tf1.fragmentParent, lp)
self.assertEquals(tf2.page, lp) self.assertEquals(tf1.page, lp)
def testInsideOutFragmentNesting(self): """ Test that even if LiveFragments have their parents assigned from the inside out, parent/child relationships still end up correct. """ innerFragment = TestFragment() outerFragment = TestFragment() page = athena.LivePage()
innerFragment.setFragmentParent(outerFragment) outerFragment.setFragmentParent(page)
self.assertEquals(page.liveFragmentChildren, [outerFragment]) self.assertEquals(outerFragment.fragmentParent, page) self.assertEquals(outerFragment.page, page)
self.assertEquals(outerFragment.liveFragmentChildren, [innerFragment]) self.assertEquals(innerFragment.fragmentParent, outerFragment) self.assertEquals(innerFragment.page, page)
class Tracebacks(unittest.TestCase): frames = (('Error()', '', 0), ('someFunction()', 'http://somesite.com:8080/someFile', 42), ('anotherFunction([object Object])', 'http://user:pass@somesite.com:8080/someOtherFile', 69))
stack = '\n'.join(['%s@%s:%d' % frame for frame in frames])
exc = {u'name': 'SomeError', u'message': 'An error occurred.', u'stack': stack}
def testStackParsing(self): p = athena.parseStack(self.stack) for iframe, oframe in izip(self.frames[::-1], p): self.assertEquals(oframe, iframe)
def testStackLengthAndOrder(self): f = athena.getJSFailure(self.exc, {}) self.assertEqual(len(f.frames), len(self.frames)) self.assertEqual(f.frames[0][0], self.frames[-1][0])
class _DelayedCall(object): def __init__(self, container, element): self.container = container self.element = element
def cancel(self): self.container.remove(self.element)
def mappend(transport): def send((ack, messages)): transport.append(messages[:]) return send
class Transport(unittest.TestCase): """ Test the various states and events which can occur that are related to the server's ability to convey a message to the client.
This includes things such as the receipt of a new request or the depletion of an existing request. """
theMessage = "Immediately Send This Message"
connectTimeout = 1 transportlessTimeout = 2 idleTimeout = 3
clientID = 'FAKE ATHENA PAGE'
def liveTransportMessageReceived(self, ctx, outgoingMessage): self.outgoingMessages.append((ctx, outgoingMessage))
def setUp(self): self.transport = [] self.scheduled = [] self.events = [] self.outgoingMessages = [] self.rdm = athena.ReliableMessageDelivery( self, connectTimeout=self.connectTimeout, transportlessTimeout=self.transportlessTimeout, idleTimeout=self.idleTimeout, connectionLost=lambda reason: self.events.append(reason), scheduler=self._schedule)
def _schedule(self, n, f, *a, **kw): """ Deterministic, rigidly controlled stand-in for reactor.callLater(). """ t = (n, f, a, kw) self.scheduled.append(t) return _DelayedCall(self.scheduled, t)
def testSendMessageImmediately(self): """ Test that if there is an output channel for messages, trying to send a message immediately does so, consuming the output channel. """ self.rdm.addOutput(mappend(self.transport)) self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [[(0, self.theMessage)]]) self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [[(0, self.theMessage)]])
def testSendMessageQueued(self): """ Test that if there is no output channel when a message is sent, it will be sent once an output channel becomes available. """ self.rdm.addMessage(self.theMessage) self.rdm.addOutput(mappend(self.transport)) self.assertEquals(self.transport, [[(0, self.theMessage)]])
def testMultipleQueuedMessages(self): """ Test that if there are several messages queued they are all sent at once when an output channel becomes available. """ self.rdm.addMessage(self.theMessage) self.rdm.addMessage(self.theMessage.encode('hex')) self.rdm.addOutput(mappend(self.transport)) self.assertEquals(self.transport, [[(0, self.theMessage), (1, self.theMessage.encode('hex'))]])
def testMultipleQueuedOutputs(self): """ Test that if there are several output channels available, each message only consumes the first of them. """ secondTransport = [] self.rdm.addOutput(mappend(self.transport)) self.rdm.addOutput(mappend(secondTransport)) self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [[(0, self.theMessage)]]) self.assertEquals(secondTransport, [])
def testMessageRedelivery(self): """ Test that outputs added while there are unacknowledged messages result in re-transmits of those messages. """ secondMessage = self.theMessage + '-2' secondTransport = [] thirdTransport = [] fourthTransport = [] self.rdm.addMessage(self.theMessage) self.rdm.addMessage(secondMessage) self.rdm.addOutput(mappend(self.transport)) self.assertEquals(self.transport, [[(0, self.theMessage), (1, secondMessage)]]) self.rdm.addOutput(mappend(secondTransport)) self.assertEquals(secondTransport, [[(0, self.theMessage), (1, secondMessage)]]) self.rdm.basketCaseReceived(None, [0, []]) self.rdm.addOutput(mappend(thirdTransport)) self.assertEquals(thirdTransport, [[(1, secondMessage)]]) self.rdm.basketCaseReceived(None, [1, []]) self.rdm.addOutput(mappend(fourthTransport)) self.assertEquals(fourthTransport, [])
def testConnectTimeout(self): """ Test that a connection timeout is set up which, if allowed to expire, will cause notification of the fact that the connection was never established. """ n, f, a, kw = self.scheduled.pop() self.failIf(self.scheduled, "Too many tasks scheduled.")
self.assertEquals(n, self.connectTimeout) f(*a, **kw)
self.assertEquals(len(self.events), 1) self.events[0].trap(athena.ConnectFailed)
self.failIf(self.scheduled, "Unexpected task scheduled after connect failed.")
def testConnectSucceeds(self): """ Test that the connection timeout is cancelled when an output channel is added. """ self.failUnless(self.scheduled, "No connect timeout scheduled.") # Sanity check self.rdm.addOutput(mappend(self.transport)) n, f, a, kw = self.scheduled.pop() self.assertEquals(n, self.idleTimeout) self.failIf(self.scheduled, "Output channel added but there is still a task pending.") self.assertEquals(self.transport, [], "Received unexpected output.")
def testOutputConsumedMessageTimeout(self): """ Test that a timeout is set up when the last output is used and that if it expires, notification of the connection being lost is delivered. In particular, test that if there is a message waiting and a new output is added, the timeout behavior is correct. """ self.rdm.addMessage(self.theMessage) self.rdm.addOutput(mappend(self.transport))
n, f, a, kw = self.scheduled.pop() self.failIf(self.scheduled, "Too many tasks scheduled.")
self.assertEquals(n, self.transportlessTimeout) f(*a, **kw)
self.assertEquals(len(self.events), 1) self.events[0].trap(athena.ConnectionLost)
self.failIf(self.scheduled, "Unexpected task scheduled after connection lost.")
def testMessageConsumedOutputTimeout(self): """ Very similar to testOutputConsumedMessageTimeout, but test the case where there is an existing output and a message is added, causing it to be used. """ self.rdm.addOutput(mappend(self.transport)) self.rdm.addMessage(self.theMessage)
n, f, a, kw = self.scheduled.pop() self.failIf(self.scheduled, "Too many tasks scheduled.")
self.assertEquals(n, self.transportlessTimeout) f(*a, **kw)
self.assertEquals(len(self.events), 1) self.events[0].trap(athena.ConnectionLost)
self.failIf(self.scheduled, "Unexpected task scheduled after connection lost.")
def testOutputConnectionAdded(self): """ Test that the timeout created when the last output is used is cancelled when a new output is added. """ self.rdm.addMessage(self.theMessage) self.rdm.addOutput(mappend(self.transport))
self.assertEquals(len(self.scheduled), 1, "Transportless timeout not created.") n, f, a, kw = self.scheduled[0] self.assertEquals(n, self.transportlessTimeout, "Unexpected task still scheduled after output added.")
self.rdm.basketCaseReceived(None, [0, []])
n, f, a, kw = self.scheduled.pop() self.assertEquals(n, self.idleTimeout)
self.failIf(self.scheduled, "Unexpected task still scheduled after output added.") self.failIf(self.events, "Unexpectedly received some kind of event.")
def testIdleOutputTimeout(self): """ Test that outputs are discarded with an empty message list if they are not used within the specified interval. """ self.rdm.addOutput(mappend(self.transport))
n, f, a, kw = self.scheduled.pop() self.assertEquals(n, self.idleTimeout) self.failIf(self.scheduled, "Unexpected tasks still scheduled in addition to idle timeout task.")
f(*a, **kw)
self.assertEquals(self.transport, [[]])
def testIdleTimeoutStartsOutputlessTimeout(self): """ Test that if the last output is removed due to idleness that another timeout for the lack of any outputs is started. """ self.rdm.addOutput(mappend(self.transport))
n, f, a, kw = self.scheduled.pop() self.assertEquals(n, self.idleTimeout) f(*a, **kw)
self.failIf(self.events, "Unexpectedly received some events.")
n, f, a, kw = self.scheduled.pop() self.assertEquals(n, self.transportlessTimeout) f(*a, **kw)
self.assertEquals(len(self.events), 1) self.events[0].trap(athena.ConnectionLost)
def testPreConnectPause(self): """ Test that no outputs are used while the reliable message deliverer is paused before the first connection is made. """ self.rdm.pause() self.rdm.addOutput(mappend(self.transport))
# The connection timeout should have been cancelled and # replaced with an idle timeout. self.assertEquals(len(self.scheduled), 1) n, f, a, kw = self.scheduled[0] self.assertEquals(n, self.idleTimeout)
self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [])
self.rdm.unpause() self.assertEquals(self.transport, [[(0, self.theMessage)]])
def testTransportlessPause(self): """ Test that if the message deliverer is paused while it has no transports, it remains so and does not use an output which is added to it. """ self.rdm.addOutput(mappend(self.transport))
self.rdm.pause() self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [])
self.rdm.unpause() self.assertEquals(self.transport, [[(0, self.theMessage)]])
def testMessagelessPause(self): """ Test that if the message deliverer is paused while it has no messages, it remains so and does not use an output when a message is added. """ self.rdm.addOutput(mappend(self.transport))
self.rdm.pause() self.rdm.addMessage(self.theMessage) self.assertEquals(self.transport, [])
self.rdm.unpause() self.assertEquals(self.transport, [[(0, self.theMessage)]])
def testStaleMessages(self): """ Test that if an older basket case with fewer messages in it arrives after a more recent, complete basket case is processed, that it is properly disregarded. """ self.rdm.basketCaseReceived( None, [-1, [[0, self.theMessage], [1, self.theMessage + "-1"], [2, self.theMessage + "-2"]]]) self.assertEquals( self.outgoingMessages, [(None, self.theMessage), (None, self.theMessage + "-1"), (None, self.theMessage + "-2")]) self.outgoingMessages = []
self.rdm.basketCaseReceived( None, [-1, [[1, self.theMessage + "-1"]]]) self.assertEquals( self.outgoingMessages, [])
self.rdm.basketCaseReceived( None, [-1, [[2, self.theMessage + "-2"]]]) self.assertEquals( self.outgoingMessages, [])
def testClosing(self): """ Test that closing a reliable message deliverer causes all of outs remaining outputs to be used up with a close message and that any future outputs added to it are immediately used in a similar manner. """ self.rdm.addOutput(mappend(self.transport)) self.rdm.addOutput(mappend(self.transport)) self.rdm.close() self.assertEquals(self.transport, [[(0, (athena.CLOSE, []))], [(0, (athena.CLOSE, []))]])
self.transport = [] self.rdm.addOutput(mappend(self.transport)) self.assertEquals(self.transport, [[(0, (athena.CLOSE, []))]])
def testCloseBeforeConnect(self): """ Test that closing the reliable message deliverer before a connection is ever established properly cleans up any timeouts. """ self.rdm.close() self.failIf(self.scheduled, "Expected no scheduled calls.")
def test_closeExcessOnReceived(self): """ Test that any excess idle transports are closed when a message is received. """ secondTransport = [] self.rdm.addOutput(mappend(self.transport)) self.rdm.addOutput(mappend(secondTransport)) d = self.rdm.basketCaseReceived(None, [0, []]) self.assertEquals(self.transport, [[]]) self.assertEquals(secondTransport, [[]]) self.failIf(d.called)
def test_closeExcessOnUnpaused(self): """ Test that any excess idle transports are closed when the message deliverer is unpaused. """ secondTransport = [] self.rdm.pause() self.rdm.pause() self.rdm.addOutput(mappend(self.transport)) self.rdm.addOutput(mappend(secondTransport)) self.rdm.unpause() self.assertEqual(self.transport, []) self.assertEqual(secondTransport, []) self.rdm.unpause() self.assertEqual(self.transport, [[]]) self.assertEqual(secondTransport, [])
def test_specialUnloadSequence(self): """ When the page is unloading in the brosser, it needs to pre-empt its own message queue and send a special identifier to express that this is a "last gasp" message and must be dealt with immediately. """ self.rdm.basketCaseReceived(None, [0, [[athena.UNLOAD, [athena.CLOSE, []]]]]) self.assertEqual(self.outgoingMessages, [(None, [athena.CLOSE, []])])
class LiveMixinTestsMixin(CSSModuleTestMixin): """ Test-method defining mixin class for L{LiveElement} and L{LiveFragment} testing.
@ivar elementFactory: No-argument callable which returns an object against which tests will be run.
@ivar liveGlueRenderer: The name of the live glue renderer on objects returned from L{elementFactory}. """ liveGlueRenderer = None
def elementFactory(self): raise NotImplementedError("%s did not implement elementFactory" % (self,))
def test_localDetach(self): """ Verify that L{_athenaDetachServer} removes the element from its parent and disassociates it from the page locally. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page) element._athenaDetachServer() self.assertNotIn(element, page.liveFragmentChildren) self.assertIdentical(element.fragmentParent, None) self.assertIdentical(element.page, None)
def test_localDetachWithChildren(self): """ Similar to L{test_localDetach}, but cover the case where the removed element has a child of its own and verify that that child is also detached. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page) child = self.elementFactory() child.setFragmentParent(element) element._athenaDetachServer() self.assertNotIn(element, page.liveFragmentChildren) self.assertIdentical(element.fragmentParent, None) self.assertIdentical(element.page, None) self.assertNotIn(child, element.liveFragmentChildren) self.assertIdentical(child.fragmentParent, None) self.assertIdentical(child.page, None)
def test_localDetachOrphaned(self): """ L{_athenaDetachServer} should raise L{athena.OrphanedFragment} if the element is not attached. """ element = self.elementFactory() self.assertRaises(athena.OrphanedFragment, element._athenaDetachServer) page = athena.LivePage() element.setFragmentParent(page) element._athenaDetachServer() self.assertRaises(athena.OrphanedFragment, element._athenaDetachServer)
def test_detach(self): """ Verify that L{detach} informs the client of the event and returns a Deferred which fires when the client acknowledges this. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page)
calls = [] def callRemote(methodName): d = Deferred() calls.append((methodName, d)) return d element.callRemote = callRemote
d = element.detach() self.assertEqual(len(calls), 1) self.assertEqual(calls[0][0], '_athenaDetachClient') calls[0][1].callback(None) self.assertNotIn(element, page.liveFragmentChildren) self.assertIdentical(element.fragmentParent, None) self.assertIdentical(element.page, None)
def test_detachWithChildren(self): """ Similar to L{test_detach}, but cover the case where the removed element has a child of its own and verify that that child is also detached. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page) child = self.elementFactory() child.setFragmentParent(element)
calls = [] def callRemote(methodName): d = Deferred() calls.append((methodName, d)) return d element.callRemote = callRemote child.callRemote = callRemote
d = element.detach() self.assertEqual(len(calls), 1) self.assertEqual(calls[0][0], '_athenaDetachClient') calls[0][1].callback(None) self.assertNotIn(element, page.liveFragmentChildren) self.assertIdentical(element.fragmentParent, None) self.assertIdentical(element.page, None) self.assertNotIn(child, element.liveFragmentChildren) self.assertIdentical(child.fragmentParent, None) self.assertIdentical(child.page, None)
def test_localDetachCallback(self): """ Verify that C{detached} is called when C{_athenaDetachServer} is called. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page)
detachCall = [] def detached(): detachCall.append((element.fragmentParent, element.page)) element.detached = detached
element._athenaDetachServer() self.assertEqual(detachCall, [(None, None)])
def test_detachCallback(self): """ Verify that C{detached} is called C{detach} is called locally. """ page = athena.LivePage() element = self.elementFactory() element.setFragmentParent(page)
detachCall = [] def detached(): detachCall.append((element.fragmentParent, element.page)) element.detached = detached
calls = [] def callRemote(methodName): d = Deferred() calls.append(d) return d element.callRemote = callRemote
d = element.detach()
self.assertEqual(detachCall, []) calls[0].callback(None) self.assertEqual(detachCall, [(None, None)])
def test_glueIncludesStylesheets(self): """ Our element's glue should include inline stylesheet references. """ element = self.elementFactory() element.cssModule = u'TestCSSModuleDependencies.Dependor' element.docFactory = loaders.stan( tags.div(render=tags.directive(self.liveGlueRenderer)))
page = ElementRenderingLivePage(element) page.cssModules = self._makeCSSRegistry()
D = renderLivePage(page) def cbRendered(result): expected = flat.flatten( page.getStylesheetStan( [page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'), page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')])) self.assertIn(expected, result) D.addCallback(cbRendered) return D
def test_glueIncludesStylesheetsOnce(self): """ Our element's glue shouldn't include redundant stylesheet references. """ element = self.elementFactory() element.cssModule = u'TestCSSModuleDependencies.Dependor' element.docFactory = loaders.stan( tags.div(render=tags.directive(self.liveGlueRenderer)))
page = ElementRenderingLivePage(element) page.docFactory = loaders.stan(tags.invisible[ tags.invisible(render=tags.directive('liveglue')), tags.invisible(render=tags.directive('element')), tags.invisible(render=tags.directive('element'))]) page.cssModules = self._makeCSSRegistry()
D = renderLivePage(page) def cbRendered(result): expected = flat.flatten( page.getStylesheetStan( [page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'), page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')])) self.assertIn(expected, result) D.addCallback(cbRendered) return D
class LiveElementTests(LiveMixinTestsMixin, unittest.TestCase): """ Tests for L{nevow.athena.LiveElement}. """ elementFactory = athena.LiveElement liveGlueRenderer = 'liveElement'
class LiveFragmentTests(LiveMixinTestsMixin, unittest.TestCase): """ Tests for L{nevow.athena.LiveFragment}. """ elementFactory = athena.LiveFragment liveGlueRenderer = 'liveFragment'
class DummyLiveElement(LiveElement): """ A "counting" Athena element used for tests involving the plugin system (e.g., supplied as the argument to the "--element" option). """ classCounter = 0
def __init__(self): """ Create a L{DummyLiveElement} with a 'counter' attribute set to a unique, incremented ID, used for comparing instances. """ LiveElement.__init__(self) DummyLiveElement.classCounter += 1 self.counter = DummyLiveElement.classCounter
class LivePageTests(unittest.TestCase, CSSModuleTestMixin): """ Tests for L{nevow.athena.LivePage} """
def setUp(self): """ Create and remember a L{LivePage} instance. """ self.page = athena.LivePage()
def tearDown(self): """ Shut this test's L{LivePage} timers down, if the test started them up. """ if hasattr(self.page, '_messageDeliverer'): self.page._messageDeliverer.close()
def test_bootstrapCall(self): """ L{LivePage.bootstrapCall} should generate a JSON-serialized string for calling a single JavaScript function. """ bc = self.page._bootstrapCall( "SomeModule.someMethod", [u"one", 2, {u"three": 4.1}]) self.assertEqual( bc, 'SomeModule.someMethod("one", 2, {"three":4.1});')
def test_pageJsClassDependencies(self): """ L{LivePage.render_liveglue} should include modules that the L{LivePage}'s jsClass depends on. """ self.page.jsClass = u'PythonTestSupport.Dependor.PageTest' freq = FakeRequest() self.page._becomeLive(url.URL.fromRequest(freq)) ctx = WovenContext(tag=tags.div()) ctx.remember(freq, IRequest) self.assertEqual(self.page.render_liveglue(ctx, None), ctx.tag) expectDependor = flat.flatten(self.page.getImportStan(u'PythonTestSupport.Dependor')) expectDependee = flat.flatten(self.page.getImportStan(u'PythonTestSupport.Dependee')) result = flat.flatten(ctx.tag, ctx) self.assertIn(expectDependor, result) self.assertIn(expectDependee, result)
def test_pageCSSModuleDependencies(self): """ L{athena.LivePage.render_liveglue} should include CSS modules that the top-level C{cssModule} depends on. """ self.page.cssModule = u'TestCSSModuleDependencies.Dependor' self.page.cssModules = self._makeCSSRegistry()
self.page._becomeLive(url.URL()) ctx = WovenContext(tag=tags.div()) ctx.remember(FakeRequest(), IRequest) self.assertEqual(self.page.render_liveglue(ctx, None), ctx.tag) expected = flat.flatten( self.page.getStylesheetStan( [self.page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'), self.page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')])) self.assertIn(expected, flat.flatten(ctx.tag, ctx))
def test_bootstraps(self): """ L{LivePage._bootstraps} should return a list of 2-tuples of (initialization method, arguments) of methods to call in JavaScript.
Specifically, it should invoke Divmod.bootstrap with the page's own URL, and Nevow.Athena.bootstrap with the name of the client-side Page class to instantiate and the URL to instantiate it with. """ SEG = "'" + '"' URI = "http://localhost/" + SEG req = FakeRequest(uri='/' + SEG, currentSegments=[SEG]) ctx = WovenContext() ctx.remember(req, IRequest) self.page.clientID = 'asdf' self.assertEqual( self.page._bootstraps(ctx), [("Divmod.bootstrap", # Nevow's URL quoting rules are weird, but this is the URL # flattener's fault, not mine. Adjust to taste if that changes # (it won't) -glyph [u"http://localhost/'%22"]), ("Nevow.Athena.bootstrap", [u'Nevow.Athena.PageWidget', u'asdf'])])
def test_renderReconnect(self): """ L{LivePage.renderHTTP} should render a JSON-encoded version of its clientID rather than a rendered version of its template when provided with a special __athena_reconnect__ parameter. """ req = FakeRequest(args={athena.ATHENA_RECONNECT: ["1"]}) ctx = WovenContext() ctx.remember(req, IRequest) string = self.page.renderHTTP(ctx) jsonifiedID = '"%s"' % (self.page.clientID,) self.assertEqual(string, jsonifiedID)
def test_cssModules(self): """ L{athena.LivePage.cssModules} should default to L{athena._theCSSRegistry}. """ self.assertIdentical( athena.LivePage().cssModules, athena._theCSSRegistry)
def test_cssmoduleChild(self): """ L{athena.LivePage}'s C{cssmodule} child should return a correctly initialized L{athena.MappingResource}. """ theCSSMapping = {} class MyCSSModules: mapping = theCSSMapping page = athena.LivePage() page.cssModules = MyCSSModules() (res, segments) = page.locateChild(None, ('cssmodule',)) self.assertTrue(isinstance(res, athena.MappingResource)) self.assertIdentical(res.mapping, theCSSMapping)
def test_cssModuleRoot(self): """ L{athena.LivePage}'s C{cssModuleRoot} argument should be observed by L{athena.LivePage.getCSSModuleURL}. """ theCSSModuleRoot = url.URL.fromString('/test_cssModuleRoot') page = athena.LivePage( cssModuleRoot=theCSSModuleRoot) self.assertEqual( page.getCSSModuleURL(u'X.Y'), theCSSModuleRoot.child('X.Y'))
class WidgetSubcommandTests(unittest.TestCase): """ Test the twistd subcommand which runs a server to render a single Athena widget. """ def test_portOption(self): """ Verify that the --port option adds an integer to the Options' port key. """ options = widgetServiceMaker.options() options['element'] = DummyLiveElement() options.parseOptions(['--port', '3874']) self.assertEqual(options['port'], 3874) options.parseOptions(['--port', '65535']) self.assertEqual(options['port'], 65535)
def test_invalidPortOption(self): """ Verify that non-integer and out-of-range port numbers are rejected. """ options = widgetServiceMaker.options() options['element'] = DummyLiveElement() self.assertRaises(UsageError, options.parseOptions, ['--port', 'hello world']) self.assertRaises(UsageError, options.parseOptions, ['--port', '-7']) self.assertRaises(UsageError, options.parseOptions, ['--port', '70000']) self.assertRaises(UsageError, options.parseOptions, ['--port', '65536'])
def test_widgetOption(self): """ Verify that the --element option adds a class to the Options' element key. """ options = widgetServiceMaker.options() options.parseOptions(['--element', qual(DummyLiveElement)]) self.assertEquals(options['element'], DummyLiveElement)
def test_invalidWidgetOption(self): """ Verify that specifying a non-existent class is rejected. """ options = widgetServiceMaker.options() self.assertRaises( UsageError, options.parseOptions, ['--element', qual(DummyLiveElement) + 'xxx']) self.assertRaises( UsageError, options.parseOptions, ['--element', '-'])
def test_invalidMissingWidget(self): """ Verify that a missing widget class is rejected. """ options = widgetServiceMaker.options() self.assertRaises(UsageError, options.parseOptions, [])
def test_defaultPort(self): """ Verify that the default port number is 8080. """ options = widgetServiceMaker.options() options['element'] = DummyLiveElement options.parseOptions([]) self.assertEqual(options['port'], 8080)
def test_providesInterfaces(self): """ Verify that the necessary interfaces for the object to be found as a twistd subcommand plugin are provided. """ self.failUnless(IPlugin.providedBy(widgetServiceMaker)) self.failUnless(IServiceMaker.providedBy(widgetServiceMaker))
def test_makeService(self): """ Verify that the L{IService} creation function returns a service which will run a Nevow site. """ service = widgetServiceMaker.makeService({ 'element': DummyLiveElement, 'port': 8080, }) self.failUnless(isinstance(service, TCPServer)) self.assertEqual(service.args[0], 8080) self.failUnless(isinstance(service.args[1], NevowSite)) self.failUnless(isinstance(service.args[1].resource, WidgetPluginRoot)) self.failUnless(isinstance(service.args[1].resource.elementFactory(), DummyLiveElement))
def test_livePageRendering(self): """ Verify that an L{ElementRenderingLivePage} instantiated with a particular LiveElement properly renders that element. """ element = DummyLiveElement() element.jsClass = u'Dummy.ClassName' element.docFactory = stan('the element') page = ElementRenderingLivePage(element) renderDeferred = renderLivePage(page) def cbRendered(result): document = parseString(result) titles = document.getElementsByTagName('title') self.assertEqual(len(titles), 1) self.assertEqual(titles[0].firstChild.nodeValue, DummyLiveElement.__name__) divs = document.getElementsByTagName('div') self.assertEqual(len(divs), 1) self.assertEqual(divs[0].firstChild.nodeValue, 'the element') renderDeferred.addCallback(cbRendered) return renderDeferred
def test_multipleRendersMultipleWidgets(self): """ Each hit to the top-level page created by makeService should result in a new element being created by the specified element factory, so that it can be rendered multiple times. """ w = WidgetPluginRoot(DummyLiveElement) page1, seg = w.locateChild(None, ['']) page2, seg = w.locateChild(None, [''])
# Make sure the pages aren't the same. self.failUnless(isinstance(page1, ElementRenderingLivePage)) self.failUnless(isinstance(page2, ElementRenderingLivePage)) self.assertNotIdentical(page1, page2)
# Make sure the elements aren't the same. self.assertNotEqual(page1.element.counter, page2.element.counter)
def test_transportHookup(self): """ When a LivePage is rendered, it needs to hook up to its transport, which is a special resource (associated with the particular LivePage object in memory). This hookup is done by some special logic in LivePage.locateChild, among other places. Let's make sure that we can look up the live page by its client ID with the default transport root.
Athena's default transport root is whatever URL the page is rendered at. In the case of this plugin, that will usually be http://localhost:8080/ """ w = WidgetPluginRoot(DummyLiveElement) page1, seg = w.locateChild(None, ['']) page1.element.docFactory = stan('the element') page1.element.jsClass = u'Dummy.ClassName' def cbCheckPageByClientID(result): req = FakeRequest() ctx = WovenContext() ctx.remember(req, IRequest) page1prime, seg = w.locateChild(ctx, [page1.clientID]) self.assertIdentical(page1prime, page1) renderDeferred = renderLivePage(page1) renderDeferred.addCallback(cbCheckPageByClientID) return renderDeferred
|