!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache/2.2.16 (Debian). PHP/5.3.3-7+squeeze19 

uname -a: Linux mail.tri-specialutilitydistrict.com 2.6.32-5-amd64 #1 SMP Tue May 13 16:34:35 UTC
2014 x86_64
 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/usr/share/pyshared/twisted/web/test/   drwxr-xr-x
Free 129.89 GB of 142.11 GB (91.4%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_xmlrpc.py (21.05 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.web.test.test_xmlrpc -*-
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for  XML-RPC support in L{twisted.web.xmlrpc}.
"""

import datetime
import xmlrpclib
from StringIO import StringIO

from twisted.trial import unittest
from twisted.web import xmlrpc
from twisted.web.xmlrpc import (
    XMLRPC, payloadTemplate, addIntrospection, _QueryFactory, Proxy)
from twisted.web import server, static, client, error, http
from twisted.internet import reactor, defer
from twisted.internet.error import ConnectionDone
from twisted.python import failure
from twisted.web.test.test_web import DummyRequest


class AsyncXMLRPCTests(unittest.TestCase):
    """
    Tests for L{XMLRPC}'s support of Deferreds.
    """
    def setUp(self):
        self.request = DummyRequest([''])
        self.request.method = 'POST'
        self.request.content = StringIO(
            payloadTemplate % ('async', xmlrpclib.dumps(())))

        result = self.result = defer.Deferred()
        class AsyncResource(XMLRPC):
            def xmlrpc_async(self):
                return result

        self.resource = AsyncResource()


    def test_deferredResponse(self):
        """
        If an L{XMLRPC} C{xmlrpc_*} method returns a L{defer.Deferred}, the
        response to the request is the result of that L{defer.Deferred}.
        """
        self.resource.render(self.request)
        self.assertEquals(self.request.written, [])

        self.result.callback("result")

        resp = xmlrpclib.loads("".join(self.request.written))
        self.assertEquals(resp, (('result',), None))
        self.assertEquals(self.request.finished, 1)


    def test_interruptedDeferredResponse(self):
        """
        While waiting for the L{Deferred} returned by an L{XMLRPC} C{xmlrpc_*}
        method to fire, the connection the request was issued over may close.
        If this happens, neither C{write} nor C{finish} is called on the
        request.
        """
        self.resource.render(self.request)
        self.request.processingFailed(
            failure.Failure(ConnectionDone("Simulated")))
        self.result.callback("result")
        self.assertEquals(self.request.written, [])
        self.assertEquals(self.request.finished, 0)



class TestRuntimeError(RuntimeError):
    pass



class TestValueError(ValueError):
    pass



class Test(XMLRPC):

    # If you add xmlrpc_ methods to this class, go change test_listMethods
    # below.

    FAILURE = 666
    NOT_FOUND = 23
    SESSION_EXPIRED = 42

    def xmlrpc_echo(self, arg):
        return arg

    # the doc string is part of the test
    def xmlrpc_add(self, a, b):
        """
        This function add two numbers.
        """
        return a + b

    xmlrpc_add.signature = [['int', 'int', 'int'],
                            ['double', 'double', 'double']]

    # the doc string is part of the test
    def xmlrpc_pair(self, string, num):
        """
        This function puts the two arguments in an array.
        """
        return [string, num]

    xmlrpc_pair.signature = [['array', 'string', 'int']]

    # the doc string is part of the test
    def xmlrpc_defer(self, x):
        """Help for defer."""
        return defer.succeed(x)

    def xmlrpc_deferFail(self):
        return defer.fail(TestValueError())

    # don't add a doc string, it's part of the test
    def xmlrpc_fail(self):
        raise TestRuntimeError

    def xmlrpc_fault(self):
        return xmlrpc.Fault(12, "hello")

    def xmlrpc_deferFault(self):
        return defer.fail(xmlrpc.Fault(17, "hi"))

    def xmlrpc_complex(self):
        return {"a": ["b", "c", 12, []], "D": "foo"}

    def xmlrpc_dict(self, map, key):
        return map[key]
    xmlrpc_dict.help = 'Help for dict.'

    def _getFunction(self, functionPath):
        try:
            return XMLRPC._getFunction(self, functionPath)
        except xmlrpc.NoSuchFunction:
            if functionPath.startswith("SESSION"):
                raise xmlrpc.Fault(self.SESSION_EXPIRED,
                                   "Session non-existant/expired.")
            else:
                raise


class TestAuthHeader(Test):
    """
    This is used to get the header info so that we can test
    authentication.
    """
    def __init__(self):
        Test.__init__(self)
        self.request = None

    def render(self, request):
        self.request = request
        return Test.render(self, request)

    def xmlrpc_authinfo(self):
        return self.request.getUser(), self.request.getPassword()


class TestQueryProtocol(xmlrpc.QueryProtocol):
    """
    QueryProtocol for tests that saves headers received inside the factory.
    """
    def handleHeader(self, key, val):
        self.factory.headers[key.lower()] = val


class TestQueryFactory(xmlrpc._QueryFactory):
    """
    QueryFactory using L{TestQueryProtocol} for saving headers.
    """
    protocol = TestQueryProtocol

    def __init__(self, *args, **kwargs):
        self.headers = {}
        xmlrpc._QueryFactory.__init__(self, *args, **kwargs)


class TestQueryFactoryCancel(xmlrpc._QueryFactory):
    """
    QueryFactory that saves a reference to the
    L{twisted.internet.interfaces.IConnector} to test connection lost.
    """

    def startedConnecting(self, connector):
        self.connector = connector


class XMLRPCTestCase(unittest.TestCase):

    def setUp(self):
        self.p = reactor.listenTCP(0, server.Site(Test()),
                                   interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []

    def tearDown(self):
        self.factories = []
        return self.p.stopListening()

    def queryFactory(self, *args, **kwargs):
        """
        Specific queryFactory for proxy that uses our custom
        L{TestQueryFactory}, and save factories.
        """
        factory = TestQueryFactory(*args, **kwargs)
        self.factories.append(factory)
        return factory

    def proxy(self, factory=None):
        """
        Return a new xmlrpc.Proxy for the test site created in
        setUp(), using the given factory as the queryFactory, or
        self.queryFactory if no factory is provided.
        """
        p = xmlrpc.Proxy("http://127.0.0.1:%d/" % self.port)
        if factory is None:
            p.queryFactory = self.queryFactory
        else:
            p.queryFactory = factory
        return p

    def test_results(self):
        inputOutput = [
            ("add", (2, 3), 5),
            ("defer", ("a",), "a"),
            ("dict", ({"a": 1}, "a"), 1),
            ("pair", ("a", 1), ["a", 1]),
            ("complex", (), {"a": ["b", "c", 12, []], "D": "foo"})]

        dl = []
        for meth, args, outp in inputOutput:
            d = self.proxy().callRemote(meth, *args)
            d.addCallback(self.assertEquals, outp)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)

    def test_errors(self):
        """
        Verify that for each way a method exposed via XML-RPC can fail, the
        correct 'Content-type' header is set in the response and that the
        client-side Deferred is errbacked with an appropriate C{Fault}
        instance.
        """
        dl = []
        for code, methodName in [(666, "fail"), (666, "deferFail"),
                                 (12, "fault"), (23, "noSuchMethod"),
                                 (17, "deferFault"), (42, "SESSION_TEST")]:
            d = self.proxy().callRemote(methodName)
            d = self.assertFailure(d, xmlrpc.Fault)
            d.addCallback(lambda exc, code=code:
                self.assertEquals(exc.faultCode, code))
            dl.append(d)
        d = defer.DeferredList(dl, fireOnOneErrback=True)
        def cb(ign):
            for factory in self.factories:
                self.assertEquals(factory.headers['content-type'],
                                  'text/xml')
            self.flushLoggedErrors(TestRuntimeError, TestValueError)
        d.addCallback(cb)
        return d


    def test_cancel(self):
        """
        A deferred from the Proxy can be cancelled, disconnecting
        the L{twisted.internet.interfaces.IConnector}.
        """
        def factory(*args, **kw):
            factory.f = TestQueryFactoryCancel(*args, **kw)
            return factory.f
        d = self.proxy(factory).callRemote('add', 2, 3)
        self.assertNotEquals(factory.f.connector.state, "disconnected")
        d.cancel()
        self.assertEquals(factory.f.connector.state, "disconnected")
        d = self.assertFailure(d, defer.CancelledError)
        return d


    def test_errorGet(self):
        """
        A classic GET on the xml server should return a NOT_ALLOWED.
        """
        d = client.getPage("http://127.0.0.1:%d/" % (self.port,))
        d = self.assertFailure(d, error.Error)
        d.addCallback(
            lambda exc: self.assertEquals(int(exc.args[0]), http.NOT_ALLOWED))
        return d

    def test_errorXMLContent(self):
        """
        Test that an invalid XML input returns an L{xmlrpc.Fault}.
        """
        d = client.getPage("http://127.0.0.1:%d/" % (self.port,),
                           method="POST", postdata="foo")
        def cb(result):
            self.assertRaises(xmlrpc.Fault, xmlrpclib.loads, result)
        d.addCallback(cb)
        return d


    def test_datetimeRoundtrip(self):
        """
        If an L{xmlrpclib.DateTime} is passed as an argument to an XML-RPC
        call and then returned by the server unmodified, the result should
        be equal to the original object.
        """
        when = xmlrpclib.DateTime()
        d = self.proxy().callRemote("echo", when)
        d.addCallback(self.assertEqual, when)
        return d


    def test_doubleEncodingError(self):
        """
        If it is not possible to encode a response to the request (for example,
        because L{xmlrpclib.dumps} raises an exception when encoding a
        L{Fault}) the exception which prevents the response from being
        generated is logged and the request object is finished anyway.
        """
        d = self.proxy().callRemote("echo", "")

        # *Now* break xmlrpclib.dumps.  Hopefully the client already used it.
        def fakeDumps(*args, **kwargs):
            raise RuntimeError("Cannot encode anything at all!")
        self.patch(xmlrpclib, 'dumps', fakeDumps)

        # It doesn't matter how it fails, so long as it does.  Also, it happens
        # to fail with an implementation detail exception right now, not
        # something suitable as part of a public interface.
        d = self.assertFailure(d, Exception)

        def cbFailed(ignored):
            # The fakeDumps exception should have been logged.
            self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
        d.addCallback(cbFailed)
        return d



class XMLRPCTestCase2(XMLRPCTestCase):
    """
    Test with proxy that doesn't add a slash.
    """

    def proxy(self, factory=None):
        p = xmlrpc.Proxy("http://127.0.0.1:%d" % self.port)
        if factory is None:
            p.queryFactory = self.queryFactory
        else:
            p.queryFactory = factory
        return p


class SerializationConfigMixin:
    """
    Mixin which defines a couple tests which should pass when a particular flag
    is passed to L{XMLRPC}.

    These are not meant to be exhaustive serialization tests, since L{xmlrpclib}
    does all of the actual serialization work.  They are just meant to exercise
    a few codepaths to make sure we are calling into xmlrpclib correctly.

    @ivar flagName: A C{str} giving the name of the flag which must be passed to
        L{XMLRPC} to allow the tests to pass.  Subclasses should set this.

    @ivar value: A value which the specified flag will allow the serialization
        of.  Subclasses should set this.
    """
    def setUp(self):
        """
        Create a new XML-RPC server with C{allowNone} set to C{True}.
        """
        kwargs = {self.flagName: True}
        self.p = reactor.listenTCP(
            0, server.Site(Test(**kwargs)), interface="127.0.0.1")
        self.addCleanup(self.p.stopListening)
        self.port = self.p.getHost().port
        self.proxy = xmlrpc.Proxy(
            "http://127.0.0.1:%d/" % (self.port,), **kwargs)


    def test_roundtripValue(self):
        """
        C{self.value} can be round-tripped over an XMLRPC method call/response.
        """
        d = self.proxy.callRemote('defer', self.value)
        d.addCallback(self.assertEquals, self.value)
        return d


    def test_roundtripNestedValue(self):
        """
        A C{dict} which contains C{self.value} can be round-tripped over an
        XMLRPC method call/response.
        """
        d = self.proxy.callRemote('defer', {'a': self.value})
        d.addCallback(self.assertEquals, {'a': self.value})
        return d



class XMLRPCAllowNoneTestCase(SerializationConfigMixin, unittest.TestCase):
    """
    Tests for passing C{None} when the C{allowNone} flag is set.
    """
    flagName = "allowNone"
    value = None


try:
    xmlrpclib.loads(xmlrpclib.dumps(({}, {})), use_datetime=True)
except TypeError:
    _datetimeSupported = False
else:
    _datetimeSupported = True



class XMLRPCUseDateTimeTestCase(SerializationConfigMixin, unittest.TestCase):
    """
    Tests for passing a C{datetime.datetime} instance when the C{useDateTime}
    flag is set.
    """
    flagName = "useDateTime"
    value = datetime.datetime(2000, 12, 28, 3, 45, 59)

    if not _datetimeSupported:
        skip = (
            "Available version of xmlrpclib does not support datetime "
            "objects.")



class XMLRPCDisableUseDateTimeTestCase(unittest.TestCase):
    """
    Tests for the C{useDateTime} flag on Python 2.4.
    """
    if _datetimeSupported:
        skip = (
            "Available version of xmlrpclib supports datetime objects.")

    def test_cannotInitializeWithDateTime(self):
        """
        L{XMLRPC} raises L{RuntimeError} if passed C{True} for C{useDateTime}.
        """
        self.assertRaises(RuntimeError, XMLRPC, useDateTime=True)
        self.assertRaises(
            RuntimeError, Proxy, "http://localhost/", useDateTime=True)


    def test_cannotSetDateTime(self):
        """
        Setting L{XMLRPC.useDateTime} to C{True} after initialization raises
        L{RuntimeError}.
        """
        xmlrpc = XMLRPC(useDateTime=False)
        self.assertRaises(RuntimeError, setattr, xmlrpc, "useDateTime", True)
        proxy = Proxy("http://localhost/", useDateTime=False)
        self.assertRaises(RuntimeError, setattr, proxy, "useDateTime", True)



class XMLRPCTestAuthenticated(XMLRPCTestCase):
    """
    Test with authenticated proxy. We run this with the same inout/ouput as
    above.
    """
    user = "username"
    password = "asecret"

    def setUp(self):
        self.p = reactor.listenTCP(0, server.Site(TestAuthHeader()),
                                   interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []


    def test_authInfoInURL(self):
        p = xmlrpc.Proxy("http://%s:%s@127.0.0.1:%d/" % (
            self.user, self.password, self.port))
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEquals, [self.user, self.password])
        return d


    def test_explicitAuthInfo(self):
        p = xmlrpc.Proxy("http://127.0.0.1:%d/" % (
            self.port,), self.user, self.password)
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEquals, [self.user, self.password])
        return d


    def test_explicitAuthInfoOverride(self):
        p = xmlrpc.Proxy("http://wrong:info@127.0.0.1:%d/" % (
                self.port,), self.user, self.password)
        d = p.callRemote("authinfo")
        d.addCallback(self.assertEquals, [self.user, self.password])
        return d


class XMLRPCTestIntrospection(XMLRPCTestCase):

    def setUp(self):
        xmlrpc = Test()
        addIntrospection(xmlrpc)
        self.p = reactor.listenTCP(0, server.Site(xmlrpc),interface="127.0.0.1")
        self.port = self.p.getHost().port
        self.factories = []

    def test_listMethods(self):

        def cbMethods(meths):
            meths.sort()
            self.assertEqual(
                meths,
                ['add', 'complex', 'defer', 'deferFail',
                 'deferFault', 'dict', 'echo', 'fail', 'fault',
                 'pair', 'system.listMethods',
                 'system.methodHelp',
                 'system.methodSignature'])

        d = self.proxy().callRemote("system.listMethods")
        d.addCallback(cbMethods)
        return d

    def test_methodHelp(self):
        inputOutputs = [
            ("defer", "Help for defer."),
            ("fail", ""),
            ("dict", "Help for dict.")]

        dl = []
        for meth, expected in inputOutputs:
            d = self.proxy().callRemote("system.methodHelp", meth)
            d.addCallback(self.assertEquals, expected)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)

    def test_methodSignature(self):
        inputOutputs = [
            ("defer", ""),
            ("add", [['int', 'int', 'int'],
                     ['double', 'double', 'double']]),
            ("pair", [['array', 'string', 'int']])]

        dl = []
        for meth, expected in inputOutputs:
            d = self.proxy().callRemote("system.methodSignature", meth)
            d.addCallback(self.assertEquals, expected)
            dl.append(d)
        return defer.DeferredList(dl, fireOnOneErrback=True)


class XMLRPCClientErrorHandling(unittest.TestCase):
    """
    Test error handling on the xmlrpc client.
    """
    def setUp(self):
        self.resource = static.Data(
            "This text is not a valid XML-RPC response.",
            "text/plain")
        self.resource.isLeaf = True
        self.port = reactor.listenTCP(0, server.Site(self.resource),
                                                     interface='127.0.0.1')

    def tearDown(self):
        return self.port.stopListening()

    def test_erroneousResponse(self):
        """
        Test that calling the xmlrpc client on a static http server raises
        an exception.
        """
        proxy = xmlrpc.Proxy("http://127.0.0.1:%d/" %
                             (self.port.getHost().port,))
        return self.assertFailure(proxy.callRemote("someMethod"), Exception)



class TestQueryFactoryParseResponse(unittest.TestCase):
    """
    Test the behaviour of L{_QueryFactory.parseResponse}.
    """

    def setUp(self):
        # The _QueryFactory that we are testing. We don't care about any
        # of the constructor parameters.
        self.queryFactory = _QueryFactory(
            path=None, host=None, method='POST', user=None, password=None,
            allowNone=False, args=())
        # An XML-RPC response that will parse without raising an error.
        self.goodContents = xmlrpclib.dumps(('',))
        # An 'XML-RPC response' that will raise a parsing error.
        self.badContents = 'invalid xml'
        # A dummy 'reason' to pass to clientConnectionLost. We don't care
        # what it is.
        self.reason = failure.Failure(ConnectionDone())


    def test_parseResponseCallbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as a callback
        of L{_QueryFactory.parseResponse}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addCallback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.parseResponse(self.goodContents)
        return d


    def test_parseResponseErrbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as an errback
        of L{_QueryFactory.parseResponse}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addErrback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.parseResponse(self.badContents)
        return d


    def test_badStatusErrbackSafety(self):
        """
        We can safely call L{_QueryFactory.clientConnectionLost} as an errback
        of L{_QueryFactory.badStatus}.
        """
        d = self.queryFactory.deferred
        # The failure mode is that this callback raises an AlreadyCalled
        # error. We have to add it now so that it gets called synchronously
        # and triggers the race condition.
        d.addErrback(self.queryFactory.clientConnectionLost, self.reason)
        self.queryFactory.badStatus('status', 'message')
        return d

    def test_parseResponseWithoutData(self):
        """
        Some server can send a response without any data:
        L{_QueryFactory.parseResponse} should catch the error and call the
        result errback.
        """
        content = """
<methodResponse>
 <params>
  <param>
  </param>
 </params>
</methodResponse>"""
        d = self.queryFactory.deferred
        self.queryFactory.parseResponse(content)
        return self.assertFailure(d, IndexError)

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0094 ]--