Viewing file: _posixstdio.py (5.47 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.test.test_stdio -*-
"""Standard input/out/err support.
Future Plans::
support for stderr, perhaps Rewrite to use the reactor instead of an ad-hoc mechanism for connecting protocols to transport.
Maintainer: James Y Knight """
import warnings, errno, os from zope.interface import implements
from twisted.internet import process, error, interfaces from twisted.python import log, failure
class PipeAddress(object): implements(interfaces.IAddress)
class StandardIO(object): implements(interfaces.ITransport, interfaces.IProducer, interfaces.IConsumer, interfaces.IHalfCloseableDescriptor) _reader = None _writer = None disconnected = False disconnecting = False
def __init__(self, proto, stdin=0, stdout=1): from twisted.internet import reactor self.protocol = proto
self._writer = process.ProcessWriter(reactor, self, 'write', stdout) try: self._writer.startReading() except IOError, e: if e.errno == errno.EPERM: # epoll will reject certain file descriptors by raising # EPERM. Most commonly, this means stdout was redirected to # a regular file. raise RuntimeError( "This reactor does not support this type of file " "descriptor (fd %d, mode %d) (for example, epollreactor " "does not support normal files. See #4429)." % ( stdout, os.fstat(stdout).st_mode)) raise
self._reader = process.ProcessReader(reactor, self, 'read', stdin) self._reader.startReading() self.protocol.makeConnection(self)
# ITransport
# XXX Actually, see #3597. def loseWriteConnection(self): if self._writer is not None: self._writer.loseConnection()
def write(self, data): if self._writer is not None: self._writer.write(data)
def writeSequence(self, data): if self._writer is not None: self._writer.writeSequence(data)
def loseConnection(self): self.disconnecting = True
if self._writer is not None: self._writer.loseConnection() if self._reader is not None: # Don't loseConnection, because we don't want to SIGPIPE it. self._reader.stopReading()
def getPeer(self): return PipeAddress()
def getHost(self): return PipeAddress()
# Callbacks from process.ProcessReader/ProcessWriter def childDataReceived(self, fd, data): self.protocol.dataReceived(data)
def childConnectionLost(self, fd, reason): if self.disconnected: return
if reason.value.__class__ == error.ConnectionDone: # Normal close if fd == 'read': self._readConnectionLost(reason) else: self._writeConnectionLost(reason) else: self.connectionLost(reason)
def connectionLost(self, reason): self.disconnected = True
# Make sure to cleanup the other half _reader = self._reader _writer = self._writer protocol = self.protocol self._reader = self._writer = None self.protocol = None
if _writer is not None and not _writer.disconnected: _writer.connectionLost(reason)
if _reader is not None and not _reader.disconnected: _reader.connectionLost(reason)
try: protocol.connectionLost(reason) except: log.err()
def _writeConnectionLost(self, reason): self._writer=None if self.disconnecting: self.connectionLost(reason) return
p = interfaces.IHalfCloseableProtocol(self.protocol, None) if p: try: p.writeConnectionLost() except: log.err() self.connectionLost(failure.Failure())
def _readConnectionLost(self, reason): self._reader=None p = interfaces.IHalfCloseableProtocol(self.protocol, None) if p: try: p.readConnectionLost() except: log.err() self.connectionLost(failure.Failure()) else: self.connectionLost(reason)
# IConsumer def registerProducer(self, producer, streaming): if self._writer is None: producer.stopProducing() else: self._writer.registerProducer(producer, streaming)
def unregisterProducer(self): if self._writer is not None: self._writer.unregisterProducer()
# IProducer def stopProducing(self): self.loseConnection()
def pauseProducing(self): if self._reader is not None: self._reader.pauseProducing()
def resumeProducing(self): if self._reader is not None: self._reader.resumeProducing()
# Stupid compatibility: def closeStdin(self): """Compatibility only, don't use. Same as loseWriteConnection.""" warnings.warn("This function is deprecated, use loseWriteConnection instead.", category=DeprecationWarning, stacklevel=2) self.loseWriteConnection()
def stopReading(self): """Compatibility only, don't use. Call pauseProducing.""" self.pauseProducing()
def startReading(self): """Compatibility only, don't use. Call resumeProducing.""" self.resumeProducing()
|