!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache/2.2.16 (Debian). PHP/5.3.3-7+squeeze19 

uname -a: Linux mail.tri-specialutilitydistrict.com 2.6.32-5-amd64 #1 SMP Tue May 13 16:34:35 UTC
2014 x86_64
 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/usr/share/pyshared/twisted/test/   drwxr-xr-x
Free 130.02 GB of 142.11 GB (91.49%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_threadpool.py (17.14 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.python.threadpool}
"""

import pickle, time, weakref, gc, threading

from twisted.trial import unittest, util
from twisted.python import threadpool, threadable, failure, context
from twisted.internet import reactor
from twisted.internet.defer import Deferred

#
# See the end of this module for the remainder of the imports.
#

class Synchronization(object):
    failures = 0

    def __init__(self, N, waiting):
        self.N = N
        self.waiting = waiting
        self.lock = threading.Lock()
        self.runs = []

    def run(self):
        # This is the testy part: this is supposed to be invoked
        # serially from multiple threads.  If that is actually the
        # case, we will never fail to acquire this lock.  If it is
        # *not* the case, we might get here while someone else is
        # holding the lock.
        if self.lock.acquire(False):
            if not len(self.runs) % 5:
                time.sleep(0.0002) # Constant selected based on
                                   # empirical data to maximize the
                                   # chance of a quick failure if this
                                   # code is broken.
            self.lock.release()
        else:
            self.failures += 1

        # This is just the only way I can think of to wake up the test
        # method.  It doesn't actually have anything to do with the
        # test.
        self.lock.acquire()
        self.runs.append(None)
        if len(self.runs) == self.N:
            self.waiting.release()
        self.lock.release()

    synchronized = ["run"]
threadable.synchronize(Synchronization)



class ThreadPoolTestCase(unittest.TestCase):
    """
    Test threadpools.
    """
    def _waitForLock(self, lock):
        for i in xrange(1000000):
            if lock.acquire(False):
                break
            time.sleep(1e-5)
        else:
            self.fail("A long time passed without succeeding")


    def test_attributes(self):
        """
        L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
        L{ThreadPool.__init__}.
        """
        pool = threadpool.ThreadPool(12, 22)
        self.assertEqual(pool.min, 12)
        self.assertEqual(pool.max, 22)


    def test_start(self):
        """
        L{ThreadPool.start} creates the minimum number of threads specified.
        """
        pool = threadpool.ThreadPool(0, 5)
        pool.start()
        self.addCleanup(pool.stop)
        self.assertEqual(len(pool.threads), 0)

        pool = threadpool.ThreadPool(3, 10)
        self.assertEqual(len(pool.threads), 0)
        pool.start()
        self.addCleanup(pool.stop)
        self.assertEqual(len(pool.threads), 3)


    def test_threadCreationArguments(self):
        """
        Test that creating threads in the threadpool with application-level
        objects as arguments doesn't results in those objects never being
        freed, with the thread maintaining a reference to them as long as it
        exists.
        """
        tp = threadpool.ThreadPool(0, 1)
        tp.start()
        self.addCleanup(tp.stop)

        # Sanity check - no threads should have been started yet.
        self.assertEqual(tp.threads, [])

        # Here's our function
        def worker(arg):
            pass
        # weakref needs an object subclass
        class Dumb(object):
            pass
        # And here's the unique object
        unique = Dumb()

        workerRef = weakref.ref(worker)
        uniqueRef = weakref.ref(unique)

        # Put some work in
        tp.callInThread(worker, unique)

        # Add an event to wait completion
        event = threading.Event()
        tp.callInThread(event.set)
        event.wait(self.getTimeout())

        del worker
        del unique
        gc.collect()
        self.assertEquals(uniqueRef(), None)
        self.assertEquals(workerRef(), None)


    def test_threadCreationArgumentsCallInThreadWithCallback(self):
        """
        As C{test_threadCreationArguments} above, but for
        callInThreadWithCallback.
        """

        tp = threadpool.ThreadPool(0, 1)
        tp.start()
        self.addCleanup(tp.stop)

        # Sanity check - no threads should have been started yet.
        self.assertEqual(tp.threads, [])

        # this holds references obtained in onResult
        refdict = {} # name -> ref value

        onResultWait = threading.Event()
        onResultDone = threading.Event()

        resultRef = []

        # result callback
        def onResult(success, result):
            onResultWait.wait(self.getTimeout())
            refdict['workerRef'] = workerRef()
            refdict['uniqueRef'] = uniqueRef()
            onResultDone.set()
            resultRef.append(weakref.ref(result))

        # Here's our function
        def worker(arg, test):
            return Dumb()

        # weakref needs an object subclass
        class Dumb(object):
            pass

        # And here's the unique object
        unique = Dumb()

        onResultRef = weakref.ref(onResult)
        workerRef = weakref.ref(worker)
        uniqueRef = weakref.ref(unique)

        # Put some work in
        tp.callInThreadWithCallback(onResult, worker, unique, test=unique)

        del worker
        del unique
        gc.collect()

        # let onResult collect the refs
        onResultWait.set()
        # wait for onResult
        onResultDone.wait(self.getTimeout())

        self.assertEquals(uniqueRef(), None)
        self.assertEquals(workerRef(), None)

        # XXX There's a race right here - has onResult in the worker thread
        # returned and the locals in _worker holding it and the result been
        # deleted yet?

        del onResult
        gc.collect()
        self.assertEqual(onResultRef(), None)
        self.assertEqual(resultRef[0](), None)


    def test_persistence(self):
        """
        Threadpools can be pickled and unpickled, which should preserve the
        number of threads and other parameters.
        """
        pool = threadpool.ThreadPool(7, 20)

        self.assertEquals(pool.min, 7)
        self.assertEquals(pool.max, 20)

        # check that unpickled threadpool has same number of threads
        copy = pickle.loads(pickle.dumps(pool))

        self.assertEquals(copy.min, 7)
        self.assertEquals(copy.max, 20)


    def _threadpoolTest(self, method):
        """
        Test synchronization of calls made with C{method}, which should be
        one of the mechanisms of the threadpool to execute work in threads.
        """
        # This is a schizophrenic test: it seems to be trying to test
        # both the callInThread()/dispatch() behavior of the ThreadPool as well
        # as the serialization behavior of threadable.synchronize().  It
        # would probably make more sense as two much simpler tests.
        N = 10

        tp = threadpool.ThreadPool()
        tp.start()
        self.addCleanup(tp.stop)

        waiting = threading.Lock()
        waiting.acquire()
        actor = Synchronization(N, waiting)

        for i in xrange(N):
            method(tp, actor)

        self._waitForLock(waiting)

        self.failIf(actor.failures, "run() re-entered %d times" %
                                    (actor.failures,))


    def test_dispatch(self):
        """
        Call C{_threadpoolTest} with C{dispatch}.
        """
        return self._threadpoolTest(
            lambda tp, actor: tp.dispatch(actor, actor.run))

    test_dispatch.suppress = [util.suppress(
                message="dispatch\(\) is deprecated since Twisted 8.0, "
                        "use callInThread\(\) instead",
                category=DeprecationWarning)]


    def test_callInThread(self):
        """
        Call C{_threadpoolTest} with C{callInThread}.
        """
        return self._threadpoolTest(
            lambda tp, actor: tp.callInThread(actor.run))


    def test_callInThreadException(self):
        """
        L{ThreadPool.callInThread} logs exceptions raised by the callable it
        is passed.
        """
        class NewError(Exception):
            pass

        def raiseError():
            raise NewError()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThread(raiseError)
        tp.start()
        tp.stop()

        errors = self.flushLoggedErrors(NewError)
        self.assertEqual(len(errors), 1)


    def test_callInThreadWithCallback(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(True, result)} where C{result} is the value returned
        by the callable supplied.
        """
        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            waiter.release()
            results.append(success)
            results.append(result)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, lambda : "test")
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        self.assertTrue(results[0])
        self.assertEqual(results[1], "test")


    def test_callInThreadWithCallbackExceptionInCallback(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
        two-tuple of C{(False, failure)} where C{failure} represents the
        exception raised by the callable supplied.
        """
        class NewError(Exception):
            pass

        def raiseError():
            raise NewError()

        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            waiter.release()
            results.append(success)
            results.append(result)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, raiseError)
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        self.assertFalse(results[0])
        self.assertTrue(isinstance(results[1], failure.Failure))
        self.assertTrue(issubclass(results[1].type, NewError))


    def test_callInThreadWithCallbackExceptionInOnResult(self):
        """
        L{ThreadPool.callInThreadWithCallback} logs the exception raised by
        C{onResult}.
        """
        class NewError(Exception):
            pass

        waiter = threading.Lock()
        waiter.acquire()

        results = []

        def onResult(success, result):
            results.append(success)
            results.append(result)
            raise NewError()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, lambda : None)
        tp.callInThread(waiter.release)
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()

        errors = self.flushLoggedErrors(NewError)
        self.assertEqual(len(errors), 1)

        self.assertTrue(results[0])
        self.assertEqual(results[1], None)


    def test_callbackThread(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls the function it is
        given and the C{onResult} callback in the same thread.
        """
        threadIds = []

        import thread

        event = threading.Event()

        def onResult(success, result):
            threadIds.append(thread.get_ident())
            event.set()

        def func():
            threadIds.append(thread.get_ident())

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, func)
        tp.start()
        self.addCleanup(tp.stop)

        event.wait(self.getTimeout())
        self.assertEqual(len(threadIds), 2)
        self.assertEqual(threadIds[0], threadIds[1])


    def test_callbackContext(self):
        """
        The context L{ThreadPool.callInThreadWithCallback} is invoked in is
        shared by the context the callable and C{onResult} callback are
        invoked in.
        """
        myctx = context.theContextTracker.currentContext().contexts[-1]
        myctx['testing'] = 'this must be present'

        contexts = []

        event = threading.Event()

        def onResult(success, result):
            ctx = context.theContextTracker.currentContext().contexts[-1]
            contexts.append(ctx)
            event.set()

        def func():
            ctx = context.theContextTracker.currentContext().contexts[-1]
            contexts.append(ctx)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, func)
        tp.start()
        self.addCleanup(tp.stop)

        event.wait(self.getTimeout())

        self.assertEqual(len(contexts), 2)
        self.assertEqual(myctx, contexts[0])
        self.assertEqual(myctx, contexts[1])


    def test_existingWork(self):
        """
        Work added to the threadpool before its start should be executed once
        the threadpool is started: this is ensured by trying to release a lock
        previously acquired.
        """
        waiter = threading.Lock()
        waiter.acquire()

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThread(waiter.release) # before start()
        tp.start()

        try:
            self._waitForLock(waiter)
        finally:
            tp.stop()


    def test_dispatchDeprecation(self):
        """
        Test for the deprecation of the dispatch method.
        """
        tp = threadpool.ThreadPool()
        tp.start()
        self.addCleanup(tp.stop)

        def cb():
            return tp.dispatch(None, lambda: None)

        self.assertWarns(DeprecationWarning,
                         "dispatch() is deprecated since Twisted 8.0, "
                         "use callInThread() instead",
                         __file__, cb)


    def test_dispatchWithCallbackDeprecation(self):
        """
        Test for the deprecation of the dispatchWithCallback method.
        """
        tp = threadpool.ThreadPool()
        tp.start()
        self.addCleanup(tp.stop)

        def cb():
            return tp.dispatchWithCallback(
                None,
                lambda x: None,
                lambda x: None,
                lambda: None)

        self.assertWarns(DeprecationWarning,
                     "dispatchWithCallback() is deprecated since Twisted 8.0, "
                     "use twisted.internet.threads.deferToThread() instead.",
                     __file__, cb)



class RaceConditionTestCase(unittest.TestCase):
    def setUp(self):
        self.event = threading.Event()
        self.threadpool = threadpool.ThreadPool(0, 10)
        self.threadpool.start()


    def tearDown(self):
        del self.event
        self.threadpool.stop()
        del self.threadpool


    def test_synchronization(self):
        """
        Test a race condition: ensure that actions run in the pool synchronize
        with actions run in the main thread.
        """
        timeout = self.getTimeout()
        self.threadpool.callInThread(self.event.set)
        self.event.wait(timeout)
        self.event.clear()
        for i in range(3):
            self.threadpool.callInThread(self.event.wait)
        self.threadpool.callInThread(self.event.set)
        self.event.wait(timeout)
        if not self.event.isSet():
            self.event.set()
            self.fail("Actions not synchronized")


    def test_singleThread(self):
        """
        The submission of a new job to a thread pool in response to the
        C{onResult} callback does not cause a new thread to be added to the
        thread pool.

        This requires that the thread which calls C{onResult} to have first
        marked itself as available so that when the new job is queued, that
        thread may be considered to run it.  This is desirable so that when
        only N jobs are ever being executed in the thread pool at once only
        N threads will ever be created.
        """
        # Ensure no threads running
        self.assertEquals(self.threadpool.workers, 0)

        loopDeferred = Deferred()

        def onResult(success, counter):
            reactor.callFromThread(submit, counter)

        def submit(counter):
            if counter:
                self.threadpool.callInThreadWithCallback(
                    onResult, lambda: counter - 1)
            else:
                loopDeferred.callback(None)

        def cbLoop(ignored):
            # Ensure there is only one thread running.
            self.assertEqual(self.threadpool.workers, 1)

        loopDeferred.addCallback(cbLoop)
        submit(10)
        return loopDeferred



class ThreadSafeListDeprecationTestCase(unittest.TestCase):
    """
    Test deprecation of threadpool.ThreadSafeList in twisted.python.threadpool
    """

    def test_threadSafeList(self):
        """
        Test deprecation of L{threadpool.ThreadSafeList}.
        """
        threadpool.ThreadSafeList()

        warningsShown = self.flushWarnings([self.test_threadSafeList])
        self.assertEquals(len(warningsShown), 1)
        self.assertIdentical(warningsShown[0]['category'], DeprecationWarning)
        self.assertEquals(
            warningsShown[0]['message'],
            "twisted.python.threadpool.ThreadSafeList was deprecated in "
            "Twisted 10.1.0: This was an internal implementation detail of "
            "support for Jython 2.1, which is now obsolete.")

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0087 ]--