Viewing file: serialjava.py (7.68 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!jython #Python Serial Port Extension for Win32, Linux, BSD, Jython #module for serial IO for Jython and JavaComm #see __init__.py # #(C) 2002-2003 Chris Liechti <cliechti@gmx.net> # this is distributed under a free software license, see license.txt
import javax.comm from serialutil import *
VERSION = "$Revision: 1.10 $".split()[1] #extract CVS version
def device(portnumber): """Turn a port number into a device name""" enum = javax.comm.CommPortIdentifier.getPortIdentifiers() ports = [] while enum.hasMoreElements(): el = enum.nextElement() if el.getPortType() == javax.comm.CommPortIdentifier.PORT_SERIAL: ports.append(el) return ports[portnumber].getName()
class Serial(SerialBase): """Serial port class, implemented with javax.comm and thus usable with jython and the appropriate java extension.""" def open(self): """Open port with current settings. This may throw a SerialException if the port cannot be opened.""" if self._port is None: raise SerialException("Port must be configured before it can be used.") if type(self._port) == type(''): #strings are taken directly portId = javax.comm.CommPortIdentifier.getPortIdentifier(self._port) else: portId = javax.comm.CommPortIdentifier.getPortIdentifier(device(self._port)) #numbers are transformed to a comportid obj try: self.sPort = portId.open("python serial module", 10) except Exception, msg: self.sPort = None raise SerialException("Could not open port: %s" % msg) self._reconfigurePort() self._instream = self.sPort.getInputStream() self._outstream = self.sPort.getOutputStream() self._isOpen = True
def _reconfigurePort(self): """Set commuication parameters on opened port.""" if not self.sPort: raise SerialException("Can only operate on a valid port handle") self.sPort.enableReceiveTimeout(30) if self._bytesize == FIVEBITS: jdatabits = javax.comm.SerialPort.DATABITS_5 elif self._bytesize == SIXBITS: jdatabits = javax.comm.SerialPort.DATABITS_6 elif self._bytesize == SEVENBITS: jdatabits = javax.comm.SerialPort.DATABITS_7 elif self._bytesize == EIGHTBITS: jdatabits = javax.comm.SerialPort.DATABITS_8 else: raise ValueError("unsupported bytesize: %r" % self._bytesize) if self._stopbits == STOPBITS_ONE: jstopbits = javax.comm.SerialPort.STOPBITS_1 elif stopbits == STOPBITS_ONE_HALVE: self._jstopbits = javax.comm.SerialPort.STOPBITS_1_5 elif self._stopbits == STOPBITS_TWO: jstopbits = javax.comm.SerialPort.STOPBITS_2 else: raise ValueError("unsupported number of stopbits: %r" % self._stopbits)
if self._parity == PARITY_NONE: jparity = javax.comm.SerialPort.PARITY_NONE elif self._parity == PARITY_EVEN: jparity = javax.comm.SerialPort.PARITY_EVEN elif self._parity == PARITY_ODD: jparity = javax.comm.SerialPort.PARITY_ODD #~ elif self._parity == PARITY_MARK: #~ jparity = javax.comm.SerialPort.PARITY_MARK #~ elif self._parity == PARITY_SPACE: #~ jparity = javax.comm.SerialPort.PARITY_SPACE else: raise ValueError("unsupported parity type: %r" % self._parity)
jflowin = jflowout = 0 if self._rtscts: jflowin |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_IN jflowout |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_OUT if self._xonxoff: jflowin |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_IN jflowout |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_OUT self.sPort.setSerialPortParams(baudrate, jdatabits, jstopbits, jparity) self.sPort.setFlowControlMode(jflowin | jflowout) if self._timeout >= 0: self.sPort.enableReceiveTimeout(self._timeout*1000) else: self.sPort.disableReceiveTimeout()
def close(self): """Close port""" if self._isOpen: if self.sPort: self._instream.close() self._outstream.close() self.sPort.close() self.sPort = None self._isOpen = False
def makeDeviceName(self, port): return device(port)
# - - - - - - - - - - - - - - - - - - - - - - - -
def inWaiting(self): """Return the number of characters currently in the input buffer.""" if not self.sPort: raise portNotOpenError return self._instream.available()
def read(self, size=1): """Read size bytes from the serial port. If a timeout is set it may return less characters as requested. With no timeout it will block until the requested number of bytes is read.""" if not self.sPort: raise portNotOpenError read = '' if size > 0: while len(read) < size: x = self._instream.read() if x == -1: if self.timeout >= 0: break else: read = read + chr(x) return read
def write(self, data): """Output the given string over the serial port.""" if not self.sPort: raise portNotOpenError self._outstream.write(data)
def flushInput(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.sPort: raise portNotOpenError self._instream.skip(self._instream.available())
def flushOutput(self): """Clear output buffer, aborting the current output and discarding all that is in the buffer.""" if not self.sPort: raise portNotOpenError self._outstream.flush()
def sendBreak(self, duration=0.25): """Send break condition.""" if not self.sPort: raise portNotOpenError self.sPort.sendBreak(duration*1000.0)
def setRTS(self, level=1): """Set terminal status line: Request To Send""" if not self.sPort: raise portNotOpenError self.sPort.setRTS(level) def setDTR(self, level=1): """Set terminal status line: Data Terminal Ready""" if not self.sPort: raise portNotOpenError self.sPort.setDTR(level)
def getCTS(self): """Read terminal status line: Clear To Send""" if not self.sPort: raise portNotOpenError self.sPort.isCTS()
def getDSR(self): """Read terminal status line: Data Set Ready""" if not self.sPort: raise portNotOpenError self.sPort.isDSR()
def getRI(self): """Read terminal status line: Ring Indicator""" if not self.sPort: raise portNotOpenError self.sPort.isRI()
def getCD(self): """Read terminal status line: Carrier Detect""" if not self.sPort: raise portNotOpenError self.sPort.isCD()
if __name__ == '__main__': s = Serial(0, baudrate=19200, #baudrate bytesize=EIGHTBITS, #number of databits parity=PARITY_EVEN, #enable parity checking stopbits=STOPBITS_ONE, #number of stopbits timeout=3, #set a timeout value, None for waiting forever xonxoff=0, #enable software flow control rtscts=0, #enable RTS/CTS flow control ) s.setRTS(1) s.setDTR(1) s.flushInput() s.flushOutput() s.write('hello') print repr(s.read(5)) print s.inWaiting() del s
|