!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache/2.2.16 (Debian). PHP/5.3.3-7+squeeze19 

uname -a: Linux mail.tri-specialutilitydistrict.com 2.6.32-5-amd64 #1 SMP Tue May 13 16:34:35 UTC
2014 x86_64
 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/usr/share/pyshared/twisted/internet/test/   drwxr-xr-x
Free 129.83 GB of 142.11 GB (91.36%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_core.py (9.66 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) 2008-2009 Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorCore}.
"""

__metaclass__ = type

import signal
import time
import inspect

from twisted.internet.abstract import FileDescriptor
from twisted.internet.error import ReactorAlreadyRunning
from twisted.internet.defer import Deferred
from twisted.internet.test.reactormixins import ReactorBuilder



class ObjectModelIntegrationTest(ReactorBuilder):
    """
    Test details of object model integration against all reactors.
    """

    def test_newstyleReactor(self):
        """
        Checks that all reactors on a platform have method resolution order
        containing only new style classes.
        """
        reactor = self.buildReactor()
        self.assertTrue(isinstance(reactor, object))
        mro = inspect.getmro(type(reactor))
        for subclass in mro:
            self.assertTrue(issubclass(subclass, object))



class SystemEventTestsBuilder(ReactorBuilder):
    """
    Builder defining tests relating to L{IReactorCore.addSystemEventTrigger}
    and L{IReactorCore.fireSystemEvent}.
    """
    def test_stopWhenNotStarted(self):
        """
        C{reactor.stop()} raises L{RuntimeError} when called when the reactor
        has not been started.
        """
        reactor = self.buildReactor()
        self.assertRaises(RuntimeError, reactor.stop)


    def test_stopWhenAlreadyStopped(self):
        """
        C{reactor.stop()} raises L{RuntimeError} when called after the reactor
        has been stopped.
        """
        reactor = self.buildReactor()
        reactor.callWhenRunning(reactor.stop)
        reactor.run()
        self.assertRaises(RuntimeError, reactor.stop)


    def test_callWhenRunningOrder(self):
        """
        Functions are run in the order that they were passed to
        L{reactor.callWhenRunning}.
        """
        reactor = self.buildReactor()
        events = []
        reactor.callWhenRunning(events.append, "first")
        reactor.callWhenRunning(events.append, "second")
        reactor.callWhenRunning(reactor.stop)
        reactor.run()
        self.assertEqual(events, ["first", "second"])


    def test_runningForStartupEvents(self):
        """
        The reactor is not running when C{"before"} C{"startup"} triggers are
        called and is running when C{"during"} and C{"after"} C{"startup"}
        triggers are called.
        """
        reactor = self.buildReactor()
        state = {}
        def beforeStartup():
            state['before'] = reactor.running
        def duringStartup():
            state['during'] = reactor.running
        def afterStartup():
            state['after'] = reactor.running
        reactor.addSystemEventTrigger("before", "startup", beforeStartup)
        reactor.addSystemEventTrigger("during", "startup", duringStartup)
        reactor.addSystemEventTrigger("after", "startup", afterStartup)
        reactor.callWhenRunning(reactor.stop)
        self.assertEqual(state, {})
        reactor.run()
        self.assertEqual(
            state,
            {"before": False,
             "during": True,
             "after": True})


    def test_signalHandlersInstalledDuringStartup(self):
        """
        Signal handlers are installed in responsed to the C{"during"}
        C{"startup"}.
        """
        reactor = self.buildReactor()
        phase = [None]
        def beforeStartup():
            phase[0] = "before"
        def afterStartup():
            phase[0] = "after"
        reactor.addSystemEventTrigger("before", "startup", beforeStartup)
        reactor.addSystemEventTrigger("after", "startup", afterStartup)

        sawPhase = [None]
        def fakeSignal(signum, action):
            sawPhase[0] = phase[0]
        self.patch(signal, 'signal', fakeSignal)
        reactor.callWhenRunning(reactor.stop)
        self.assertEqual(phase[0], None)
        self.assertEqual(sawPhase[0], None)
        reactor.run()
        self.assertEqual(sawPhase[0], "before")
        self.assertEqual(phase[0], "after")


    def test_stopShutDownEvents(self):
        """
        C{reactor.stop()} fires all three phases of shutdown event triggers
        before it makes C{reactor.run()} return.
        """
        reactor = self.buildReactor()
        events = []
        reactor.addSystemEventTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))
        reactor.addSystemEventTrigger(
            "during", "shutdown",
            lambda: events.append(("during", "shutdown")))
        reactor.addSystemEventTrigger(
            "after", "shutdown",
            lambda: events.append(("after", "shutdown")))
        reactor.callWhenRunning(reactor.stop)
        reactor.run()
        self.assertEquals(events, [("before", "shutdown"),
                                   ("during", "shutdown"),
                                   ("after", "shutdown")])


    def test_shutdownFiresTriggersAsynchronously(self):
        """
        C{"before"} C{"shutdown"} triggers are not run synchronously from
        L{reactor.stop}.
        """
        reactor = self.buildReactor()
        events = []
        reactor.addSystemEventTrigger(
            "before", "shutdown", events.append, "before shutdown")
        def stopIt():
            reactor.stop()
            events.append("stopped")
        reactor.callWhenRunning(stopIt)
        self.assertEqual(events, [])
        reactor.run()
        self.assertEqual(events, ["stopped", "before shutdown"])


    def test_shutdownDisconnectsCleanly(self):
        """
        A L{IFileDescriptor.connectionLost} implementation which raises an
        exception does not prevent the remaining L{IFileDescriptor}s from
        having their C{connectionLost} method called.
        """
        lostOK = [False]

        # Subclass FileDescriptor to get logPrefix
        class ProblematicFileDescriptor(FileDescriptor):
            def connectionLost(self, reason):
                raise RuntimeError("simulated connectionLost error")

        class OKFileDescriptor(FileDescriptor):
            def connectionLost(self, reason):
                lostOK[0] = True

        reactor = self.buildReactor()

        # Unfortunately, it is necessary to patch removeAll to directly control
        # the order of the returned values.  The test is only valid if
        # ProblematicFileDescriptor comes first.  Also, return these
        # descriptors only the first time removeAll is called so that if it is
        # called again the file descriptors aren't re-disconnected.
        fds = iter([ProblematicFileDescriptor(), OKFileDescriptor()])
        reactor.removeAll = lambda: fds
        reactor.callWhenRunning(reactor.stop)
        self.runReactor(reactor)
        self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
        self.assertTrue(lostOK[0])


    def test_multipleRun(self):
        """
        C{reactor.run()} raises L{ReactorAlreadyRunning} when called when
        the reactor is already running.
        """
        events = []
        def reentrantRun():
            self.assertRaises(ReactorAlreadyRunning, reactor.run)
            events.append("tested")
        reactor = self.buildReactor()
        reactor.callWhenRunning(reentrantRun)
        reactor.callWhenRunning(reactor.stop)
        reactor.run()
        self.assertEqual(events, ["tested"])


    def test_runWithAsynchronousBeforeStartupTrigger(self):
        """
        When there is a C{'before'} C{'startup'} trigger which returns an
        unfired L{Deferred}, C{reactor.run()} starts the reactor and does not
        return until after C{reactor.stop()} is called
        """
        events = []
        def trigger():
            events.append('trigger')
            d = Deferred()
            d.addCallback(callback)
            reactor.callLater(0, d.callback, None)
            return d
        def callback(ignored):
            events.append('callback')
            reactor.stop()
        reactor = self.buildReactor()
        reactor.addSystemEventTrigger('before', 'startup', trigger)
        reactor.run()
        self.assertEqual(events, ['trigger', 'callback'])


    def test_iterate(self):
        """
        C{reactor.iterate()} does not block.
        """
        reactor = self.buildReactor()
        t = reactor.callLater(5, reactor.crash)

        start = time.time()
        reactor.iterate(0) # Shouldn't block
        elapsed = time.time() - start

        self.failUnless(elapsed < 2)
        t.cancel()


    def test_crash(self):
        """
        C{reactor.crash()} stops the reactor and does not fire shutdown
        triggers.
        """
        reactor = self.buildReactor()
        events = []
        reactor.addSystemEventTrigger(
            "before", "shutdown",
            lambda: events.append(("before", "shutdown")))
        reactor.callWhenRunning(reactor.callLater, 0, reactor.crash)
        reactor.run()
        self.assertFalse(reactor.running)
        self.assertFalse(
            events,
            "Shutdown triggers invoked but they should not have been.")


    def test_runAfterCrash(self):
        """
        C{reactor.run()} restarts the reactor after it has been stopped by
        C{reactor.crash()}.
        """
        events = []
        def crash():
            events.append('crash')
            reactor.crash()
        reactor = self.buildReactor()
        reactor.callWhenRunning(crash)
        reactor.run()
        def stop():
            events.append(('stop', reactor.running))
            reactor.stop()
        reactor.callWhenRunning(stop)
        reactor.run()
        self.assertEqual(events, ['crash', ('stop', True)])


globals().update(SystemEventTestsBuilder.makeTestCaseClasses())
globals().update(ObjectModelIntegrationTest.makeTestCaseClasses())

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0089 ]--