Viewing file: test_fdset.py (6.1 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) 2009-2010 Twisted Matrix Laboratories. # See LICENSE for details.
""" Tests for implementations of L{IReactorFDSet}. """
__metaclass__ = type
import socket
from twisted.internet.interfaces import IReactorFDSet from twisted.internet.abstract import FileDescriptor from twisted.internet.test.reactormixins import ReactorBuilder
# twisted.internet.tcp nicely defines some names with proper values on # several different platforms. from twisted.internet.tcp import EINPROGRESS, EWOULDBLOCK
class ReactorFDSetTestsBuilder(ReactorBuilder): """ Builder defining tests relating to L{IReactorFDSet}. """ requiredInterface = IReactorFDSet
def _connectedPair(self): """ Return the two sockets which make up a new TCP connection. """ serverSocket = socket.socket() serverSocket.bind(('127.0.0.1', 0)) serverSocket.listen(1) self.addCleanup(serverSocket.close)
client = socket.socket() self.addCleanup(client.close) client.setblocking(False) try: client.connect(('127.0.0.1', serverSocket.getsockname()[1])) except socket.error, e: self.assertIn(e.args[0], (EINPROGRESS, EWOULDBLOCK)) except Exception, e: self.fail("Connect should have succeeded or raised EINPROGRESS or EWOULDBLOCK") server, addr = serverSocket.accept() self.addCleanup(server.close)
return client, server
def _simpleSetup(self): reactor = self.buildReactor()
client, server = self._connectedPair()
fd = FileDescriptor(reactor) fd.fileno = client.fileno
return reactor, fd, server
def test_addReader(self): """ C{reactor.addReader()} accepts an L{IReadDescriptor} provider and calls its C{doRead} method when there may be data available on its C{fileno}. """ reactor, fd, server = self._simpleSetup()
fd.doRead = reactor.stop reactor.addReader(fd) server.sendall('x')
# The reactor will only stop if it calls fd.doRead. self.runReactor(reactor) # Nothing to assert, just be glad we got this far.
def test_removeReader(self): """ L{reactor.removeReader()} accepts an L{IReadDescriptor} provider previously passed to C{reactor.addReader()} and causes it to no longer be monitored for input events. """ reactor, fd, server = self._simpleSetup()
def fail(): self.fail("doRead should not be called") fd.doRead = fail
reactor.addReader(fd) reactor.removeReader(fd) server.sendall('x')
# Give the reactor two timed event passes to notice that there's I/O # (if it is incorrectly watching for I/O). reactor.callLater(0, reactor.callLater, 0, reactor.stop)
self.runReactor(reactor) # Getting here means the right thing happened probably.
def test_addWriter(self): """ C{reactor.addWriter()} accepts an L{IWriteDescriptor} provider and calls its C{doWrite} method when it may be possible to write to its C{fileno}. """ reactor, fd, server = self._simpleSetup()
fd.doWrite = reactor.stop reactor.addWriter(fd)
self.runReactor(reactor) # Getting here is great.
def _getFDTest(self, kind): """ Helper for getReaders and getWriters tests. """ reactor = self.buildReactor() get = getattr(reactor, 'get' + kind + 's') add = getattr(reactor, 'add' + kind) remove = getattr(reactor, 'remove' + kind)
client, server = self._connectedPair()
self.assertNotIn(client, get()) self.assertNotIn(server, get())
add(client) self.assertIn(client, get()) self.assertNotIn(server, get())
remove(client) self.assertNotIn(client, get()) self.assertNotIn(server, get())
def test_getReaders(self): """ L{IReactorFDSet.getReaders} reflects the additions and removals made with L{IReactorFDSet.addReader} and L{IReactorFDSet.removeReader}. """ self._getFDTest('Reader')
def test_removeWriter(self): """ L{reactor.removeWriter()} accepts an L{IWriteDescriptor} provider previously passed to C{reactor.addWriter()} and causes it to no longer be monitored for outputability. """ reactor, fd, server = self._simpleSetup()
def fail(): self.fail("doWrite should not be called") fd.doWrite = fail
reactor.addWriter(fd) reactor.removeWriter(fd)
# Give the reactor two timed event passes to notice that there's I/O # (if it is incorrectly watching for I/O). reactor.callLater(0, reactor.callLater, 0, reactor.stop)
self.runReactor(reactor) # Getting here means the right thing happened probably.
def test_getWriters(self): """ L{IReactorFDSet.getWriters} reflects the additions and removals made with L{IReactorFDSet.addWriter} and L{IReactorFDSet.removeWriter}. """ self._getFDTest('Writer')
def test_removeAll(self): """ C{reactor.removeAll()} removes all registered L{IReadDescriptor} providers and all registered L{IWriteDescriptor} providers and returns them. """ reactor = self.buildReactor()
reactor, fd, server = self._simpleSetup()
fd.doRead = lambda: self.fail("doRead should not be called") fd.doWrite = lambda: self.fail("doWrite should not be called")
server.sendall('x')
reactor.addReader(fd) reactor.addWriter(fd)
removed = reactor.removeAll()
# Give the reactor two timed event passes to notice that there's I/O # (if it is incorrectly watching for I/O). reactor.callLater(0, reactor.callLater, 0, reactor.stop)
self.runReactor(reactor) # Getting here means the right thing happened probably.
self.assertEqual(removed, [fd])
globals().update(ReactorFDSetTestsBuilder.makeTestCaseClasses())
|