!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache/2.2.16 (Debian). PHP/5.3.3-7+squeeze19 

uname -a: Linux mail.tri-specialutilitydistrict.com 2.6.32-5-amd64 #1 SMP Tue May 13 16:34:35 UTC
2014 x86_64
 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/usr/share/pyshared/twisted/test/   drwxr-xr-x
Free 129.99 GB of 142.11 GB (91.47%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_stdio.py (13.78 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) 2006-2010 Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.internet.stdio}.
"""

import os, sys, itertools

from twisted.trial import unittest
from twisted.python import filepath, log
from twisted.python.runtime import platform
from twisted.internet import error, defer, protocol, stdio, reactor
from twisted.test.test_tcp import ConnectionLostNotifyingProtocol


# A short string which is intended to appear here and nowhere else,
# particularly not in any random garbage output CPython unavoidable
# generates (such as in warning text and so forth).  This is searched
# for in the output from stdio_test_lastwrite.py and if it is found at
# the end, the functionality works.
UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!'

skipWindowsNopywin32 = None
if platform.isWindows():
    try:
        import win32process
    except ImportError:
        skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
                                "in the absence of win32process.")


class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
    """
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    C{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or C{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    """
    onDataReceived = None

    def __init__(self):
        self.onConnection = defer.Deferred()
        self.onCompletion = defer.Deferred()
        self.data = {}


    def connectionMade(self):
        self.onConnection.callback(None)


    def childDataReceived(self, name, bytes):
        """
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not C{None}.
        """
        self.data[name] = self.data.get(name, '') + bytes
        if self.onDataReceived is not None:
            d, self.onDataReceived = self.onDataReceived, None
            d.callback(self)


    def processEnded(self, reason):
        self.onCompletion.callback(reason)



class StandardInputOutputTestCase(unittest.TestCase):

    skip = skipWindowsNopywin32

    def _spawnProcess(self, proto, sibling, *args, **kw):
        """
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        """
        import twisted
        subenv = dict(os.environ)
        subenv['PYTHONPATH'] = os.pathsep.join(
            [os.path.abspath(
                    os.path.dirname(os.path.dirname(twisted.__file__))),
             subenv.get('PYTHONPATH', '')
             ])
        args = [sys.executable,
             filepath.FilePath(__file__).sibling(sibling).path,
             reactor.__class__.__module__] + list(args)
        return reactor.spawnProcess(
            proto,
            sys.executable,
            args,
            env=subenv,
            **kw)


    def _requireFailure(self, d, callback):
        def cb(result):
            self.fail("Process terminated with non-Failure: %r" % (result,))
        def eb(err):
            return callback(err)
        return d.addCallbacks(cb, eb)


    def test_loseConnection(self):
        """
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        """
        errorLogFile = self.mktemp()
        log.msg("Child process logging to " + errorLogFile)
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion
        self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile)

        def processEnded(reason):
            # Copy the child's log to ours so it's more visible.
            for line in file(errorLogFile):
                log.msg("Child logged: " + line.rstrip())

            self.failIfIn(1, p.data)
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_readConnectionLost(self):
        """
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        """
        errorLogFile = self.mktemp()
        log.msg("Child process logging to " + errorLogFile)
        p = StandardIOTestProcessProtocol()
        p.onDataReceived = defer.Deferred()

        def cbBytes(ignored):
            d = p.onCompletion
            p.transport.closeStdin()
            return d
        p.onDataReceived.addCallback(cbBytes)

        def processEnded(reason):
            reason.trap(error.ProcessDone)
        d = self._requireFailure(p.onDataReceived, processEnded)

        self._spawnProcess(
            p, 'stdio_test_halfclose.py', errorLogFile)
        return d


    def test_lastWriteReceived(self):
        """
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        """
        p = StandardIOTestProcessProtocol()

        # Note: the OS X bug which prompted the addition of this test
        # is an apparent race condition involving non-blocking PTYs.
        # Delaying the parent process significantly increases the
        # likelihood of the race going the wrong way.  If you need to
        # fiddle with this code at all, uncommenting the next line
        # will likely make your life much easier.  It is commented out
        # because it makes the test quite slow.

        # p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))

        try:
            self._spawnProcess(
                p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING,
                usePTY=True)
        except ValueError, e:
            # Some platforms don't work with usePTY=True
            raise unittest.SkipTest(str(e))

        def processEnded(reason):
            """
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            """
            self.assertTrue(
                p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
                "Received %r from child, did not find expected bytes." % (
                    p.data,))
            reason.trap(error.ProcessDone)
        return self._requireFailure(p.onCompletion, processEnded)


    def test_hostAndPeer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion
        self._spawnProcess(p, 'stdio_test_hostpeer.py')

        def processEnded(reason):
            host, peer = p.data[1].splitlines()
            self.failUnless(host)
            self.failUnless(peer)
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_write(self):
        """
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        self._spawnProcess(p, 'stdio_test_write.py')

        def processEnded(reason):
            self.assertEquals(p.data[1], 'ok!')
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_writeSequence(self):
        """
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        self._spawnProcess(p, 'stdio_test_writeseq.py')

        def processEnded(reason):
            self.assertEquals(p.data[1], 'ok!')
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def _junkPath(self):
        junkPath = self.mktemp()
        junkFile = file(junkPath, 'w')
        for i in xrange(1024):
            junkFile.write(str(i) + '\n')
        junkFile.close()
        return junkPath


    def test_producer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        written = []
        toWrite = range(100)

        def connectionMade(ign):
            if toWrite:
                written.append(str(toWrite.pop()) + "\n")
                proc.write(written[-1])
                reactor.callLater(0.01, connectionMade, None)

        proc = self._spawnProcess(p, 'stdio_test_producer.py')

        p.onConnection.addCallback(connectionMade)

        def processEnded(reason):
            self.assertEquals(p.data[1], ''.join(written))
            self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_consumer(self):
        """
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        """
        p = StandardIOTestProcessProtocol()
        d = p.onCompletion

        junkPath = self._junkPath()

        self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)

        def processEnded(reason):
            self.assertEquals(p.data[1], file(junkPath).read())
            reason.trap(error.ProcessDone)
        return self._requireFailure(d, processEnded)


    def test_normalFileStandardOut(self):
        """
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        """
        onConnLost = defer.Deferred()
        proto = ConnectionLostNotifyingProtocol(onConnLost)
        path = filepath.FilePath(self.mktemp())
        self.normal = normal = path.open('w')
        self.addCleanup(normal.close)

        kwargs = dict(stdout=normal.fileno())
        if not platform.isWindows():
            # Make a fake stdin so that StandardIO doesn't mess with the *real*
            # stdin.
            r, w = os.pipe()
            self.addCleanup(os.close, r)
            self.addCleanup(os.close, w)
            kwargs['stdin'] = r
        connection = stdio.StandardIO(proto, **kwargs)

        # The reactor needs to spin a bit before it might have incorrectly
        # decided stdout is closed.  Use this counter to keep track of how
        # much we've let it spin.  If it closes before we expected, this
        # counter will have a value that's too small and we'll know.
        howMany = 5
        count = itertools.count()

        def spin():
            for value in count:
                if value == howMany:
                    connection.loseConnection()
                    return
                connection.write(str(value))
                break
            reactor.callLater(0, spin)
        reactor.callLater(0, spin)

        # Once the connection is lost, make sure the counter is at the
        # appropriate value.
        def cbLost(reason):
            self.assertEquals(count.next(), howMany + 1)
            self.assertEquals(
                path.getContent(),
                ''.join(map(str, range(howMany))))
        onConnLost.addCallback(cbLost)
        return onConnLost
    if reactor.__class__.__name__ == 'EPollReactor':
        test_normalFileStandardOut.skip = (
            "epoll(7) does not support normal files.  See #4429.  "
            "This should be a todo but technical limitations prevent "
            "this.")
    elif platform.isWindows():
        test_normalFileStandardOut.skip = (
            "StandardIO does not accept stdout as an argument to Windows.  "
            "Testing redirection to a file is therefore harder.")


    def test_normalFileStandardOutGoodEpollError(self):
        """
        Using StandardIO with epollreactor with stdout redirected to a
        normal file fails with a comprehensible error (until it is
        supported, when #4429 is resolved).  See also #2259 and #3442.
        """
        path = filepath.FilePath(self.mktemp())
        normal = path.open('w')
        fd = normal.fileno()
        self.addCleanup(normal.close)
        exc = self.assertRaises(
            RuntimeError,
            stdio.StandardIO, protocol.Protocol(), stdout=fd)
        
        self.assertEquals(
            str(exc),
            "This reactor does not support this type of file descriptor (fd "
            "%d, mode %d) (for example, epollreactor does not support normal "
            "files.  See #4429)." % (fd, os.fstat(fd).st_mode))
    if reactor.__class__.__name__ != 'EPollReactor':
        test_normalFileStandardOutGoodEpollError.skip = (
            "Only epollreactor is expected to fail with stdout redirected "
            "to a normal file.")

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0094 ]--