| Viewing file:  serialcli.py (9.22 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#! python#
 # Backend for .NET/Mono (IronPython), .NET >= 2
 #
 # This file is part of pySerial. https://github.com/pyserial/pyserial
 # (C) 2008-2015 Chris Liechti <cliechti@gmx.net>
 #
 # SPDX-License-Identifier:    BSD-3-Clause
 
 import clr
 import System
 import System.IO.Ports
 from serial.serialutil import *
 
 
 #~ def device(portnum):
 #~ """Turn a port number into a device name"""
 #~ return System.IO.Ports.SerialPort.GetPortNames()[portnum]
 
 
 # must invoke function with byte array, make a helper to convert strings
 # to byte arrays
 sab = System.Array[System.Byte]
 def as_byte_array(string):
 return sab([ord(x) for x in string])  # XXX will require adaption when run with a 3.x compatible IronPython
 
 class Serial(SerialBase):
 """Serial port implementation for .NET/Mono."""
 
 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
 9600, 19200, 38400, 57600, 115200)
 
 def open(self):
 """\
 Open port with current settings. This may throw a SerialException
 if the port cannot be opened.
 """
 if self._port is None:
 raise SerialException("Port must be configured before it can be used.")
 if self.is_open:
 raise SerialException("Port is already open.")
 try:
 self._port_handle = System.IO.Ports.SerialPort(self.portstr)
 except Exception as msg:
 self._port_handle = None
 raise SerialException("could not open port %s: %s" % (self.portstr, msg))
 
 self._reconfigurePort()
 self._port_handle.Open()
 self.is_open = True
 if not self._dsrdtr:
 self._update_dtr_state()
 if not self._rtscts:
 self._update_rts_state()
 self.reset_input_buffer()
 
 def _reconfigurePort(self):
 """Set communication parameters on opened port."""
 if not self._port_handle:
 raise SerialException("Can only operate on a valid port handle")
 
 #~ self._port_handle.ReceivedBytesThreshold = 1
 
 if self._timeout is None:
 self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
 else:
 self._port_handle.ReadTimeout = int(self._timeout*1000)
 
 # if self._timeout != 0 and self._interCharTimeout is not None:
 # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
 
 if self._write_timeout is None:
 self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
 else:
 self._port_handle.WriteTimeout = int(self._write_timeout*1000)
 
 
 # Setup the connection info.
 try:
 self._port_handle.BaudRate = self._baudrate
 except IOError as e:
 # catch errors from illegal baudrate settings
 raise ValueError(str(e))
 
 if self._bytesize == FIVEBITS:
 self._port_handle.DataBits     = 5
 elif self._bytesize == SIXBITS:
 self._port_handle.DataBits     = 6
 elif self._bytesize == SEVENBITS:
 self._port_handle.DataBits     = 7
 elif self._bytesize == EIGHTBITS:
 self._port_handle.DataBits     = 8
 else:
 raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
 
 if self._parity == PARITY_NONE:
 self._port_handle.Parity       = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
 elif self._parity == PARITY_EVEN:
 self._port_handle.Parity       = System.IO.Ports.Parity.Even
 elif self._parity == PARITY_ODD:
 self._port_handle.Parity       = System.IO.Ports.Parity.Odd
 elif self._parity == PARITY_MARK:
 self._port_handle.Parity       = System.IO.Ports.Parity.Mark
 elif self._parity == PARITY_SPACE:
 self._port_handle.Parity       = System.IO.Ports.Parity.Space
 else:
 raise ValueError("Unsupported parity mode: %r" % self._parity)
 
 if self._stopbits == STOPBITS_ONE:
 self._port_handle.StopBits     = System.IO.Ports.StopBits.One
 elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
 self._port_handle.StopBits     = System.IO.Ports.StopBits.OnePointFive
 elif self._stopbits == STOPBITS_TWO:
 self._port_handle.StopBits     = System.IO.Ports.StopBits.Two
 else:
 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
 
 if self._rtscts and self._xonxoff:
 self._port_handle.Handshake  = System.IO.Ports.Handshake.RequestToSendXOnXOff
 elif self._rtscts:
 self._port_handle.Handshake  = System.IO.Ports.Handshake.RequestToSend
 elif self._xonxoff:
 self._port_handle.Handshake  = System.IO.Ports.Handshake.XOnXOff
 else:
 self._port_handle.Handshake  = getattr(System.IO.Ports.Handshake, 'None')   # reserved keyword in Py3k
 
 #~ def __del__(self):
 #~ self.close()
 
 def close(self):
 """Close port"""
 if self.is_open:
 if self._port_handle:
 try:
 self._port_handle.Close()
 except System.IO.Ports.InvalidOperationException:
 # ignore errors. can happen for unplugged USB serial devices
 pass
 self._port_handle = None
 self.is_open = False
 
 #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
 
 @property
 def in_waiting(self):
 """Return the number of characters currently in the input buffer."""
 if not self._port_handle:
 raise portNotOpenError
 return self._port_handle.BytesToRead
 
 def read(self, size=1):
 """\
 Read size bytes from the serial port. If a timeout is set it may
 return less characters as requested. With no timeout it will block
 until the requested number of bytes is read.
 """
 if not self._port_handle:
 raise portNotOpenError
 # must use single byte reads as this is the only way to read
 # without applying encodings
 data = bytearray()
 while size:
 try:
 data.append(self._port_handle.ReadByte())
 except System.TimeoutException as e:
 break
 else:
 size -= 1
 return bytes(data)
 
 def write(self, data):
 """Output the given string over the serial port."""
 if not self._port_handle:
 raise portNotOpenError
 #~ if not isinstance(data, (bytes, bytearray)):
 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
 try:
 # must call overloaded method with byte array argument
 # as this is the only one not applying encodings
 self._port_handle.Write(as_byte_array(data), 0, len(data))
 except System.TimeoutException as e:
 raise writeTimeoutError
 return len(data)
 
 def reset_input_buffer(self):
 """Clear input buffer, discarding all that is in the buffer."""
 if not self._port_handle:
 raise portNotOpenError
 self._port_handle.DiscardInBuffer()
 
 def reset_output_buffer(self):
 """\
 Clear output buffer, aborting the current output and
 discarding all that is in the buffer.
 """
 if not self._port_handle:
 raise portNotOpenError
 self._port_handle.DiscardOutBuffer()
 
 def _update_break_state(self):
 """
 Set break: Controls TXD. When active, to transmitting is possible.
 """
 if not self._port_handle:
 raise portNotOpenError
 self._port_handle.BreakState = bool(self._break_state)
 
 def _update_rts_state(self):
 """Set terminal status line: Request To Send"""
 if not self._port_handle:
 raise portNotOpenError
 self._port_handle.RtsEnable = bool(self._rts_state)
 
 def _update_dtr_state(self):
 """Set terminal status line: Data Terminal Ready"""
 if not self._port_handle:
 raise portNotOpenError
 self._port_handle.DtrEnable = bool(self._dtr_state)
 
 @property
 def cts(self):
 """Read terminal status line: Clear To Send"""
 if not self._port_handle:
 raise portNotOpenError
 return self._port_handle.CtsHolding
 
 @property
 def dsr(self):
 """Read terminal status line: Data Set Ready"""
 if not self._port_handle:
 raise portNotOpenError
 return self._port_handle.DsrHolding
 
 @property
 def ri(self):
 """Read terminal status line: Ring Indicator"""
 if not self._port_handle:
 raise portNotOpenError
 #~ return self._port_handle.XXX
 return False #XXX an error would be better
 
 @property
 def cd(self):
 """Read terminal status line: Carrier Detect"""
 if not self._port_handle:
 raise portNotOpenError
 return self._port_handle.CDHolding
 
 # - - platform specific - - - -
 # none
 
 
 # Nur Testfunktion!!
 if __name__ == '__main__':
 import sys
 
 s = Serial(0)
 sys.stdio.write('%s\n' % s)
 
 s = Serial()
 sys.stdio.write('%s\n' % s)
 
 
 s.baudrate = 19200
 s.databits = 7
 s.close()
 s.port = 0
 s.open()
 sys.stdio.write('%s\n' % s)
 
 
 |