!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache/2.2.16 (Debian). PHP/5.3.3-7+squeeze19 

uname -a: Linux mail.tri-specialutilitydistrict.com 2.6.32-5-amd64 #1 SMP Tue May 13 16:34:35 UTC
2014 x86_64
 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/usr/lib/python2.6/dist-packages/twisted/conch/test/   drwxr-xr-x
Free 129.73 GB of 142.11 GB (91.29%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_cftp.py (32.3 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.conch.test.test_cftp -*-
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
# See LICENSE file for details.

"""
Tests for L{twisted.conch.scripts.cftp}.
"""

import time, sys, os, operator, getpass, struct
from StringIO import StringIO

from twisted.conch.test.test_ssh import Crypto, pyasn1

_reason = None
if Crypto and pyasn1:
    try:
        from twisted.conch import unix
        from twisted.conch.scripts import cftp
        from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar
    except ImportError, e:
        # Python 2.3 compatibility fix
        sys.modules.pop("twisted.conch.unix", None)
        unix = None
        _reason = str(e)
        del e
else:
    unix = None


from twisted.python.fakepwd import UserDatabase
from twisted.trial.unittest import TestCase
from twisted.cred import portal
from twisted.internet import reactor, protocol, interfaces, defer, error
from twisted.internet.utils import getProcessOutputAndValue
from twisted.python import log
from twisted.conch import ls
from twisted.test.proto_helpers import StringTransport
from twisted.internet.task import Clock

from twisted.conch.test import test_ssh, test_conch
from twisted.conch.test.test_filetransfer import SFTPTestBase
from twisted.conch.test.test_filetransfer import FileTransferTestAvatar



class ListingTests(TestCase):
    """
    Tests for L{lsLine}, the function which generates an entry for a file or
    directory in an SFTP I{ls} command's output.
    """
    if getattr(time, 'tzset', None) is None:
        skip = "Cannot test timestamp formatting code without time.tzset"

    def setUp(self):
        """
        Patch the L{ls} module's time function so the results of L{lsLine} are
        deterministic.
        """
        self.now = 123456789
        def fakeTime():
            return self.now
        self.patch(ls, 'time', fakeTime)

        # Make sure that the timezone ends up the same after these tests as
        # it was before.
        if 'TZ' in os.environ:
            self.addCleanup(operator.setitem, os.environ, 'TZ', os.environ['TZ'])
            self.addCleanup(time.tzset)
        else:
            def cleanup():
                # os.environ.pop is broken!  Don't use it!  Ever!  Or die!
                try:
                    del os.environ['TZ']
                except KeyError:
                    pass
                time.tzset()
            self.addCleanup(cleanup)


    def _lsInTimezone(self, timezone, stat):
        """
        Call L{ls.lsLine} after setting the timezone to C{timezone} and return
        the result.
        """
        # Set the timezone to a well-known value so the timestamps are
        # predictable.
        os.environ['TZ'] = timezone
        time.tzset()
        return ls.lsLine('foo', stat)


    def test_oldFile(self):
        """
        A file with an mtime six months (approximately) or more in the past has
        a listing including a low-resolution timestamp.
        """
        # Go with 7 months.  That's more than 6 months.
        then = self.now - (60 * 60 * 24 * 31 * 7)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Apr 26  1973 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Apr 27  1973 foo')


    def test_oldSingleDigitDayOfMonth(self):
        """
        A file with a high-resolution timestamp which falls on a day of the
        month which can be represented by one decimal digit is formatted with
        one padding 0 to preserve the columns which come after it.
        """
        # A point about 7 months in the past, tweaked to fall on the first of a
        # month so we test the case we want to test.
        then = self.now - (60 * 60 * 24 * 31 * 7) + (60 * 60 * 24 * 5)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 May 01  1973 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 May 02  1973 foo')


    def test_newFile(self):
        """
        A file with an mtime fewer than six months (approximately) in the past
        has a listing including a high-resolution timestamp excluding the year.
        """
        # A point about three months in the past.
        then = self.now - (60 * 60 * 24 * 31 * 3)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Aug 28 17:33 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Aug 29 09:33 foo')


    def test_newSingleDigitDayOfMonth(self):
        """
        A file with a high-resolution timestamp which falls on a day of the
        month which can be represented by one decimal digit is formatted with
        one padding 0 to preserve the columns which come after it.
        """
        # A point about three months in the past, tweaked to fall on the first
        # of a month so we test the case we want to test.
        then = self.now - (60 * 60 * 24 * 31 * 3) + (60 * 60 * 24 * 4)
        stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))

        self.assertEqual(
            self._lsInTimezone('America/New_York', stat),
            '!---------    0 0        0               0 Sep 01 17:33 foo')
        self.assertEqual(
            self._lsInTimezone('Pacific/Auckland', stat),
            '!---------    0 0        0               0 Sep 02 09:33 foo')



class StdioClientTests(TestCase):
    """
    Tests for L{cftp.StdioClient}.
    """
    def setUp(self):
        """
        Create a L{cftp.StdioClient} hooked up to dummy transport and a fake
        user database.
        """
        class Connection:
            pass

        conn = Connection()
        conn.transport = StringTransport()
        conn.transport.localClosed = False

        self.client = cftp.StdioClient(conn)
        self.database = self.client._pwd = UserDatabase()

        # Intentionally bypassing makeConnection - that triggers some code
        # which uses features not provided by our dumb Connection fake.
        self.client.transport = StringTransport()


    def test_exec(self):
        """
        The I{exec} command runs its arguments locally in a child process
        using the user's shell.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
            sys.executable)

        d = self.client._dispatchCommand("exec print 1 + 2")
        d.addCallback(self.assertEquals, "3\n")
        return d


    def test_execWithoutShell(self):
        """
        If the local user has no shell, the I{exec} command runs its arguments
        using I{/bin/sh}.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '')

        d = self.client._dispatchCommand("exec echo hello")
        d.addCallback(self.assertEquals, "hello\n")
        return d


    def test_bang(self):
        """
        The I{exec} command is run for lines which start with C{"!"}.
        """
        self.database.addUser(
            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
            '/bin/sh')

        d = self.client._dispatchCommand("!echo hello")
        d.addCallback(self.assertEquals, "hello\n")
        return d


    def setKnownConsoleSize(self, width, height):
        """
        For the duration of this test, patch C{cftp}'s C{fcntl} module to return
        a fixed width and height.

        @param width: the width in characters
        @type width: C{int}
        @param height: the height in characters
        @type height: C{int}
        """
        import tty # local import to avoid win32 issues
        class FakeFcntl(object):
            def ioctl(self, fd, opt, mutate):
                if opt != tty.TIOCGWINSZ:
                    self.fail("Only window-size queries supported.")
                return struct.pack("4H", height, width, 0, 0)
        self.patch(cftp, "fcntl", FakeFcntl())


    def test_progressReporting(self):
        """
        L{StdioClient._printProgressBar} prints a progress description,
        including percent done, amount transferred, transfer rate, and time
        remaining, all based the given start time, the given L{FileWrapper}'s
        progress information and the reactor's current time.
        """
        # Use a short, known console width because this simple test doesn't need
        # to test the console padding.
        self.setKnownConsoleSize(10, 34)
        clock = self.client.reactor = Clock()
        wrapped = StringIO("x")
        wrapped.name = "sample"
        wrapper = cftp.FileWrapper(wrapped)
        wrapper.size = 1024 * 10
        startTime = clock.seconds()
        clock.advance(2.0)
        wrapper.total += 4096
        self.client._printProgressBar(wrapper, startTime)
        self.assertEquals(self.client.transport.value(),
                          "\rsample 40% 4.0kB 2.0kBps 00:03 ")


    def test_reportNoProgress(self):
        """
        L{StdioClient._printProgressBar} prints a progress description that
        indicates 0 bytes transferred if no bytes have been transferred and no
        time has passed.
        """
        self.setKnownConsoleSize(10, 34)
        clock = self.client.reactor = Clock()
        wrapped = StringIO("x")
        wrapped.name = "sample"
        wrapper = cftp.FileWrapper(wrapped)
        startTime = clock.seconds()
        self.client._printProgressBar(wrapper, startTime)
        self.assertEquals(self.client.transport.value(),
                          "\rsample  0% 0.0B 0.0Bps 00:00 ")



class FileTransferTestRealm:
    def __init__(self, testDir):
        self.testDir = testDir

    def requestAvatar(self, avatarID, mind, *interfaces):
        a = FileTransferTestAvatar(self.testDir)
        return interfaces[0], a, lambda: None


class SFTPTestProcess(protocol.ProcessProtocol):
    """
    Protocol for testing cftp. Provides an interface between Python (where all
    the tests are) and the cftp client process (which does the work that is
    being tested).
    """

    def __init__(self, onOutReceived):
        """
        @param onOutReceived: A L{Deferred} to be fired as soon as data is
        received from stdout.
        """
        self.clearBuffer()
        self.onOutReceived = onOutReceived
        self.onProcessEnd = None
        self._expectingCommand = None
        self._processEnded = False

    def clearBuffer(self):
        """
        Clear any buffered data received from stdout. Should be private.
        """
        self.buffer = ''
        self._linesReceived = []
        self._lineBuffer = ''

    def outReceived(self, data):
        """
        Called by Twisted when the cftp client prints data to stdout.
        """
        log.msg('got %s' % data)
        lines = (self._lineBuffer + data).split('\n')
        self._lineBuffer = lines.pop(-1)
        self._linesReceived.extend(lines)
        # XXX - not strictly correct.
        # We really want onOutReceived to fire after the first 'cftp>' prompt
        # has been received. (See use in TestOurServerCmdLineClient.setUp)
        if self.onOutReceived is not None:
            d, self.onOutReceived = self.onOutReceived, None
            d.callback(data)
        self.buffer += data
        self._checkForCommand()

    def _checkForCommand(self):
        prompt = 'cftp> '
        if self._expectingCommand and self._lineBuffer == prompt:
            buf = '\n'.join(self._linesReceived)
            if buf.startswith(prompt):
                buf = buf[len(prompt):]
            self.clearBuffer()
            d, self._expectingCommand = self._expectingCommand, None
            d.callback(buf)

    def errReceived(self, data):
        """
        Called by Twisted when the cftp client prints data to stderr.
        """
        log.msg('err: %s' % data)

    def getBuffer(self):
        """
        Return the contents of the buffer of data received from stdout.
        """
        return self.buffer

    def runCommand(self, command):
        """
        Issue the given command via the cftp client. Return a C{Deferred} that
        fires when the server returns a result. Note that the C{Deferred} will
        callback even if the server returns some kind of error.

        @param command: A string containing an sftp command.

        @return: A C{Deferred} that fires when the sftp server returns a
        result. The payload is the server's response string.
        """
        self._expectingCommand = defer.Deferred()
        self.clearBuffer()
        self.transport.write(command + '\n')
        return self._expectingCommand

    def runScript(self, commands):
        """
        Run each command in sequence and return a Deferred that fires when all
        commands are completed.

        @param commands: A list of strings containing sftp commands.

        @return: A C{Deferred} that fires when all commands are completed. The
        payload is a list of response strings from the server, in the same
        order as the commands.
        """
        sem = defer.DeferredSemaphore(1)
        dl = [sem.run(self.runCommand, command) for command in commands]
        return defer.gatherResults(dl)

    def killProcess(self):
        """
        Kill the process if it is still running.

        If the process is still running, sends a KILL signal to the transport
        and returns a C{Deferred} which fires when L{processEnded} is called.

        @return: a C{Deferred}.
        """
        if self._processEnded:
            return defer.succeed(None)
        self.onProcessEnd = defer.Deferred()
        self.transport.signalProcess('KILL')
        return self.onProcessEnd

    def processEnded(self, reason):
        """
        Called by Twisted when the cftp client process ends.
        """
        self._processEnded = True
        if self.onProcessEnd:
            d, self.onProcessEnd = self.onProcessEnd, None
            d.callback(None)


class CFTPClientTestBase(SFTPTestBase):
    def setUp(self):
        f = open('dsa_test.pub','w')
        f.write(test_ssh.publicDSA_openssh)
        f.close()
        f = open('dsa_test','w')
        f.write(test_ssh.privateDSA_openssh)
        f.close()
        os.chmod('dsa_test', 33152)
        f = open('kh_test','w')
        f.write('127.0.0.1 ' + test_ssh.publicRSA_openssh)
        f.close()
        return SFTPTestBase.setUp(self)

    def startServer(self):
        realm = FileTransferTestRealm(self.testDir)
        p = portal.Portal(realm)
        p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
        fac = test_ssh.ConchTestServerFactory()
        fac.portal = p
        self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")

    def stopServer(self):
        if not hasattr(self.server.factory, 'proto'):
            return self._cbStopServer(None)
        self.server.factory.proto.expectedLoseConnection = 1
        d = defer.maybeDeferred(
            self.server.factory.proto.transport.loseConnection)
        d.addCallback(self._cbStopServer)
        return d

    def _cbStopServer(self, ignored):
        return defer.maybeDeferred(self.server.stopListening)

    def tearDown(self):
        for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
            try:
                os.remove(f)
            except:
                pass
        return SFTPTestBase.tearDown(self)



class TestOurServerCmdLineClient(CFTPClientTestBase):

    def setUp(self):
        CFTPClientTestBase.setUp(self)

        self.startServer()
        cmds = ('-p %i -l testuser '
               '--known-hosts kh_test '
               '--user-authentications publickey '
               '--host-key-algorithms ssh-rsa '
               '-i dsa_test '
               '-a '
               '-v '
               '127.0.0.1')
        port = self.server.getHost().port
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
        log.msg('running %s %s' % (sys.executable, cmds))
        d = defer.Deferred()
        self.processProtocol = SFTPTestProcess(d)
        d.addCallback(lambda _: self.processProtocol.clearBuffer())
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
        reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
                             env=env)
        return d

    def tearDown(self):
        d = self.stopServer()
        d.addCallback(lambda _: self.processProtocol.killProcess())
        return d

    def _killProcess(self, ignored):
        try:
            self.processProtocol.transport.signalProcess('KILL')
        except error.ProcessExitedAlready:
            pass

    def runCommand(self, command):
        """
        Run the given command with the cftp client. Return a C{Deferred} that
        fires when the command is complete. Payload is the server's output for
        that command.
        """
        return self.processProtocol.runCommand(command)

    def runScript(self, *commands):
        """
        Run the given commands with the cftp client. Returns a C{Deferred}
        that fires when the commands are all complete. The C{Deferred}'s
        payload is a list of output for each command.
        """
        return self.processProtocol.runScript(commands)

    def testCdPwd(self):
        """
        Test that 'pwd' reports the current remote directory, that 'lpwd'
        reports the current local directory, and that changing to a
        subdirectory then changing to its parent leaves you in the original
        remote directory.
        """
        # XXX - not actually a unit test, see docstring.
        homeDir = os.path.join(os.getcwd(), self.testDir)
        d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd')
        d.addCallback(lambda xs: xs[:3] + xs[4:])
        d.addCallback(self.assertEqual,
                      [homeDir, os.getcwd(), '', homeDir])
        return d

    def testChAttrs(self):
        """
        Check that 'ls -l' output includes the access permissions and that
        this output changes appropriately with 'chmod'.
        """
        def _check(results):
            self.flushLoggedErrors()
            self.assertTrue(results[0].startswith('-rw-r--r--'))
            self.assertEqual(results[1], '')
            self.assertTrue(results[2].startswith('----------'), results[2])
            self.assertEqual(results[3], '')

        d = self.runScript('ls -l testfile1', 'chmod 0 testfile1',
                           'ls -l testfile1', 'chmod 644 testfile1')
        return d.addCallback(_check)
        # XXX test chgrp/own


    def testList(self):
        """
        Check 'ls' works as expected. Checks for wildcards, hidden files,
        listing directories and listing empty directories.
        """
        def _check(results):
            self.assertEqual(results[0], ['testDirectory', 'testRemoveFile',
                                          'testRenameFile', 'testfile1'])
            self.assertEqual(results[1], ['testDirectory', 'testRemoveFile',
                                          'testRenameFile', 'testfile1'])
            self.assertEqual(results[2], ['testRemoveFile', 'testRenameFile'])
            self.assertEqual(results[3], ['.testHiddenFile', 'testRemoveFile',
                                          'testRenameFile'])
            self.assertEqual(results[4], [''])
        d = self.runScript('ls', 'ls ../' + os.path.basename(self.testDir),
                           'ls *File', 'ls -a *File', 'ls -l testDirectory')
        d.addCallback(lambda xs: [x.split('\n') for x in xs])
        return d.addCallback(_check)


    def testHelp(self):
        """
        Check that running the '?' command returns help.
        """
        d = self.runCommand('?')
        d.addCallback(self.assertEqual,
                      cftp.StdioClient(None).cmd_HELP('').strip())
        return d

    def assertFilesEqual(self, name1, name2, msg=None):
        """
        Assert that the files at C{name1} and C{name2} contain exactly the
        same data.
        """
        f1 = file(name1).read()
        f2 = file(name2).read()
        self.failUnlessEqual(f1, f2, msg)


    def testGet(self):
        """
        Test that 'get' saves the remote file to the correct local location,
        that the output of 'get' is correct and that 'rm' actually removes
        the file.
        """
        # XXX - not actually a unit test
        expectedOutput = ("Transferred %s/%s/testfile1 to %s/test file2"
                          % (os.getcwd(), self.testDir, self.testDir))
        def _checkGet(result):
            self.assertTrue(result.endswith(expectedOutput))
            self.assertFilesEqual(self.testDir + '/testfile1',
                                  self.testDir + '/test file2',
                                  "get failed")
            return self.runCommand('rm "test file2"')

        d = self.runCommand('get testfile1 "%s/test file2"' % (self.testDir,))
        d.addCallback(_checkGet)
        d.addCallback(lambda _: self.failIf(
            os.path.exists(self.testDir + '/test file2')))
        return d


    def testWildcardGet(self):
        """
        Test that 'get' works correctly when given wildcard parameters.
        """
        def _check(ignored):
            self.assertFilesEqual(self.testDir + '/testRemoveFile',
                                  'testRemoveFile',
                                  'testRemoveFile get failed')
            self.assertFilesEqual(self.testDir + '/testRenameFile',
                                  'testRenameFile',
                                  'testRenameFile get failed')

        d = self.runCommand('get testR*')
        return d.addCallback(_check)


    def testPut(self):
        """
        Check that 'put' uploads files correctly and that they can be
        successfully removed. Also check the output of the put command.
        """
        # XXX - not actually a unit test
        expectedOutput = ('Transferred %s/testfile1 to %s/%s/test"file2'
                          % (self.testDir, os.getcwd(), self.testDir))
        def _checkPut(result):
            self.assertFilesEqual(self.testDir + '/testfile1',
                                  self.testDir + '/test"file2')
            self.failUnless(result.endswith(expectedOutput))
            return self.runCommand('rm "test\\"file2"')

        d = self.runCommand('put %s/testfile1 "test\\"file2"'
                            % (self.testDir,))
        d.addCallback(_checkPut)
        d.addCallback(lambda _: self.failIf(
            os.path.exists(self.testDir + '/test"file2')))
        return d


    def test_putOverLongerFile(self):
        """
        Check that 'put' uploads files correctly when overwriting a longer
        file.
        """
        # XXX - not actually a unit test
        f = file(os.path.join(self.testDir, 'shorterFile'), 'w')
        f.write("a")
        f.close()
        f = file(os.path.join(self.testDir, 'longerFile'), 'w')
        f.write("bb")
        f.close()
        def _checkPut(result):
            self.assertFilesEqual(self.testDir + '/shorterFile',
                                  self.testDir + '/longerFile')

        d = self.runCommand('put %s/shorterFile longerFile'
                            % (self.testDir,))
        d.addCallback(_checkPut)
        return d


    def test_putMultipleOverLongerFile(self):
        """
        Check that 'put' uploads files correctly when overwriting a longer
        file and you use a wildcard to specify the files to upload.
        """
        # XXX - not actually a unit test
        os.mkdir(os.path.join(self.testDir, 'dir'))
        f = file(os.path.join(self.testDir, 'dir', 'file'), 'w')
        f.write("a")
        f.close()
        f = file(os.path.join(self.testDir, 'file'), 'w')
        f.write("bb")
        f.close()
        def _checkPut(result):
            self.assertFilesEqual(self.testDir + '/dir/file',
                                  self.testDir + '/file')

        d = self.runCommand('put %s/dir/*'
                            % (self.testDir,))
        d.addCallback(_checkPut)
        return d


    def testWildcardPut(self):
        """
        What happens if you issue a 'put' command and include a wildcard (i.e.
        '*') in parameter? Check that all files matching the wildcard are
        uploaded to the correct directory.
        """
        def check(results):
            self.assertEqual(results[0], '')
            self.assertEqual(results[2], '')
            self.assertFilesEqual(self.testDir + '/testRemoveFile',
                                  self.testDir + '/../testRemoveFile',
                                  'testRemoveFile get failed')
            self.assertFilesEqual(self.testDir + '/testRenameFile',
                                  self.testDir + '/../testRenameFile',
                                  'testRenameFile get failed')

        d = self.runScript('cd ..',
                           'put %s/testR*' % (self.testDir,),
                           'cd %s' % os.path.basename(self.testDir))
        d.addCallback(check)
        return d


    def testLink(self):
        """
        Test that 'ln' creates a file which appears as a link in the output of
        'ls'. Check that removing the new file succeeds without output.
        """
        def _check(results):
            self.flushLoggedErrors()
            self.assertEqual(results[0], '')
            self.assertTrue(results[1].startswith('l'), 'link failed')
            return self.runCommand('rm testLink')

        d = self.runScript('ln testLink testfile1', 'ls -l testLink')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, '')
        return d


    def testRemoteDirectory(self):
        """
        Test that we can create and remove directories with the cftp client.
        """
        def _check(results):
            self.assertEqual(results[0], '')
            self.assertTrue(results[1].startswith('d'))
            return self.runCommand('rmdir testMakeDirectory')

        d = self.runScript('mkdir testMakeDirectory',
                           'ls -l testMakeDirector?')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, '')
        return d


    def test_existingRemoteDirectory(self):
        """
        Test that a C{mkdir} on an existing directory fails with the
        appropriate error, and doesn't log an useless error server side.
        """
        def _check(results):
            self.assertEquals(results[0], '')
            self.assertEquals(results[1],
                              'remote error 11: mkdir failed')

        d = self.runScript('mkdir testMakeDirectory',
                           'mkdir testMakeDirectory')
        d.addCallback(_check)
        return d


    def testLocalDirectory(self):
        """
        Test that we can create a directory locally and remove it with the
        cftp client. This test works because the 'remote' server is running
        out of a local directory.
        """
        d = self.runCommand('lmkdir %s/testLocalDirectory' % (self.testDir,))
        d.addCallback(self.assertEqual, '')
        d.addCallback(lambda _: self.runCommand('rmdir testLocalDirectory'))
        d.addCallback(self.assertEqual, '')
        return d


    def testRename(self):
        """
        Test that we can rename a file.
        """
        def _check(results):
            self.assertEqual(results[0], '')
            self.assertEqual(results[1], 'testfile2')
            return self.runCommand('rename testfile2 testfile1')

        d = self.runScript('rename testfile1 testfile2', 'ls testfile?')
        d.addCallback(_check)
        d.addCallback(self.assertEqual, '')
        return d



class TestOurServerBatchFile(CFTPClientTestBase):
    def setUp(self):
        CFTPClientTestBase.setUp(self)
        self.startServer()

    def tearDown(self):
        CFTPClientTestBase.tearDown(self)
        return self.stopServer()

    def _getBatchOutput(self, f):
        fn = self.mktemp()
        open(fn, 'w').write(f)
        port = self.server.getHost().port
        cmds = ('-p %i -l testuser '
                    '--known-hosts kh_test '
                    '--user-authentications publickey '
                    '--host-key-algorithms ssh-rsa '
                    '-i dsa_test '
                    '-a '
                    '-v -b %s 127.0.0.1') % (port, fn)
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
        log.msg('running %s %s' % (sys.executable, cmds))
        env = os.environ.copy()
        env['PYTHONPATH'] = os.pathsep.join(sys.path)

        self.server.factory.expectedLoseConnection = 1

        d = getProcessOutputAndValue(sys.executable, cmds, env=env)

        def _cleanup(res):
            os.remove(fn)
            return res

        d.addCallback(lambda res: res[0])
        d.addBoth(_cleanup)

        return d

    def testBatchFile(self):
        """Test whether batch file function of cftp ('cftp -b batchfile').
        This works by treating the file as a list of commands to be run.
        """
        cmds = """pwd
ls
exit
"""
        def _cbCheckResult(res):
            res = res.split('\n')
            log.msg('RES %s' % str(res))
            self.failUnless(res[1].find(self.testDir) != -1, repr(res))
            self.failUnlessEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
                                             'testRenameFile', 'testfile1'])

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d

    def testError(self):
        """Test that an error in the batch file stops running the batch.
        """
        cmds = """chown 0 missingFile
pwd
exit
"""
        def _cbCheckResult(res):
            self.failIf(res.find(self.testDir) != -1)

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d

    def testIgnoredError(self):
        """Test that a minus sign '-' at the front of a line ignores
        any errors.
        """
        cmds = """-chown 0 missingFile
pwd
exit
"""
        def _cbCheckResult(res):
            self.failIf(res.find(self.testDir) == -1)

        d = self._getBatchOutput(cmds)
        d.addCallback(_cbCheckResult)
        return d



class TestOurServerSftpClient(CFTPClientTestBase):
    """
    Test the sftp server against sftp command line client.
    """

    def setUp(self):
        CFTPClientTestBase.setUp(self)
        return self.startServer()


    def tearDown(self):
        return self.stopServer()


    def test_extendedAttributes(self):
        """
        Test the return of extended attributes by the server: the sftp client
        should ignore them, but still be able to parse the response correctly.

        This test is mainly here to check that
        L{filetransfer.FILEXFER_ATTR_EXTENDED} has the correct value.
        """
        fn = self.mktemp()
        open(fn, 'w').write("ls .\nexit")
        port = self.server.getHost().port

        oldGetAttr = FileTransferForTestAvatar._getAttrs
        def _getAttrs(self, s):
            attrs = oldGetAttr(self, s)
            attrs["ext_foo"] = "bar"
            return attrs

        self.patch(FileTransferForTestAvatar, "_getAttrs", _getAttrs)

        self.server.factory.expectedLoseConnection = True
        cmds = ('-o', 'IdentityFile=dsa_test',
                '-o', 'UserKnownHostsFile=kh_test',
                '-o', 'HostKeyAlgorithms=ssh-rsa',
                '-o', 'Port=%i' % (port,), '-b', fn, 'testuser@127.0.0.1')
        d = getProcessOutputAndValue("sftp", cmds)
        def check(result):
            self.assertEquals(result[2], 0)
            for i in ['testDirectory', 'testRemoveFile',
                      'testRenameFile', 'testfile1']:
                self.assertIn(i, result[0])
        return d.addCallback(check)



if unix is None or Crypto is None or pyasn1 is None or interfaces.IReactorProcess(reactor, None) is None:
    if _reason is None:
        _reason = "don't run w/o spawnProcess or PyCrypto or pyasn1"
    TestOurServerCmdLineClient.skip = _reason
    TestOurServerBatchFile.skip = _reason
    TestOurServerSftpClient.skip = _reason
    StdioClientTests.skip = _reason
else:
    from twisted.python.procutils import which
    if not which('sftp'):
        TestOurServerSftpClient.skip = "no sftp command-line client available"

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0097 ]--