本文整理汇总了Python中serial.Serial类的典型用法代码示例。如果您正苦于以下问题:Python Serial类的具体用法?Python Serial怎么用?Python Serial使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Serial类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: read_serial
def read_serial(ser: serial.Serial, q_out: queue.Queue, q_in: queue.Queue,
stop_event: threading.Event):
while (ser.isOpen()):
if stop_event.is_set(): return
if not q_in.empty():
ser.write(q_in.get() + b'\n')
q_out.put(bytestostr(ser.readline()))
开发者ID:MrLeeh,项目名称:jsonwatch,代码行数:7,代码来源:watcher.py
示例2: HashTagDisplay
class HashTagDisplay():
def __init__(self, delay=5, debug=False):
# duration in seconds to allow human to read display
self.delay = delay
# print messages to shell for debugging
self.debug = debug
self.ser = Serial('/dev/ttyS0', baudrate=9600, timeout=1)
def search(self, hashtag):
# search for tweets with specified hashtag
twitter_search = Twitter(domain="search.twitter.com")
return twitter_search.search(q=hashtag)
def display(self, results):
# Display each tweet in the twitter search results
for tweet in results.get('results'):
msg = "@" + tweet.get('from_user') + ": " + tweet.get('text')
#http://stackoverflow.com/questions/8689795/python-remove-non-ascii-characters-but-leave-periods-and-spaces
msg = filter(lambda x: x in string.printable, msg)
msg = re.sub('\s+',' ', msg)
if self.debug == True:
print "msg: [" + msg + "]"
for c in msg:
self.ser.write(c)
sleep(0.01)
self.ser.write('\n');
sleep(self.delay)
开发者ID:pdp7,项目名称:tweetypi,代码行数:27,代码来源:alatweet.py
示例3: run_host_test
def run_host_test(self, target_name, port,
duration, verbose=False):
"""
Functions resets target and grabs by timeouted pooling test log
via serial port.
Function assumes target is already flashed with proper 'test' binary.
"""
output = ""
# Prepare serial for receiving data from target
baud = 9600
serial = Serial(port, timeout=1)
serial.setBaudrate(baud)
flush_serial(serial)
# Resetting target and pooling
reset(target_name, serial, verbose=verbose)
start_serial_timeour = time()
try:
while (time() - start_serial_timeour) < duration:
test_output = serial.read(512)
output += test_output
flush_serial(serial)
if '{end}' in output:
break
except KeyboardInterrupt, _:
print "CTRL+C break"
开发者ID:dreschpe,项目名称:mbed,代码行数:25,代码来源:singletest.py
示例4: testSendAndReceive4PortsLeader
def testSendAndReceive4PortsLeader(self):
"""testSendAndReceive4Ports: Creates a PortLogger, serial objects that
connect to the PortLogger, and verifies that reads and writes work"""
plogger = PortLogger(number_of_ports=4, rs485=True)
serials = []
for port_name in plogger.port_names():
new_serial = Serial(port_name)
new_serial.timeout = 1
serials.append(new_serial)
leader = serials[0]
followers = serials[1:]
self.assertEquals(len(followers), len(serials)-1)
self.assertTrue(leader) #Check that leader is not None or []
plogger.start()
for port in followers:
message = ''.join([chr(randint(32,127)) for _ in range(24)])
port.write(message)
for port2 in followers:
recvd = port2.read(len(message))
self.assertEqual('', recvd)
recvd = leader.read(len(message))
self.assertEquals(recvd, message)
message = ''.join([chr(randint(32,127)) for _ in range(24)])
leader.write(message)
for port in followers:
recvd = port.read(len(message))
self.assertEquals(recvd, message)
plogger.stop()
开发者ID:thrawn117,项目名称:mastertestbed,代码行数:34,代码来源:portlogger.py
示例5: available_ports
def available_ports():
usb_serial_ports = filter(
(lambda x: x.startswith('ttyUSB')),
os.listdir('/dev'))
ports = []
for p in usb_serial_ports:
ports.append(serial_for_url('/dev/'+p, do_not_open=True))
"""
also check for old school serial ports, my hope is that this will enable
both USB serial adapters and standard serial ports
Scans COM1 through COM255 for available serial ports
returns a list of available ports
"""
for i in range(256):
try:
p = Serial(i)
p.close()
ports.append(p)
except SerialException:
pass
return ports
开发者ID:ccraddock,项目名称:pyxid,代码行数:27,代码来源:serial_wrapper.py
示例6: __init__
class Connexion:
"""
Manage the USB connexion.
"""
def __init__(self, tty = "/dev/ttyACM0"):
"""
Open a serial connexion on specified tty, with a 1152000
baudrate.
"""
try:
self.serial = Serial(tty, baudrate = 115200)
except SerialException:
print("Can't open", tty)
exit(1)
def read(self):
""" Read a line. """
try:
return self.serial.readline().decode("ascii", "ignore")
except OSError:
sleep(1)
return self.read()
def write(self, msg):
""" Send a string. """
self.serial.write(normalize("NFD", msg).encode("ascii", "ignore"))
开发者ID:mattwilliamson,项目名称:WaDeD,代码行数:27,代码来源:connexion.py
示例7: Copernicus
class Copernicus(object):
def __init__(self, port=None, baudrate=9600):
self.serial = Serial(port, baudrate)
def read(self):
return ord(self.serial.read(1))
def write(self, value):
return self.serial.write(chr(value))
def flush(self):
self.serial.flushInput()
def set_autoupdates(self, peripheral):
self.write(autoupdate_codes[peripheral])
def query(self, peripheral):
self.write(query_codes[peripheral])
def set_dashboard_angle(self, angle):
if 0 <= angle <= 31:
self.write(angle)
else:
ValueError("Dashboard angle out of range (0-31)")
def reset_dashboard_angle(self):
self.set_dashboard_angle(0)
def led_on(self):
self.write(33)
def led_off(self):
self.write(32)
开发者ID:bzurkowski,项目名称:morse-transmitter,代码行数:34,代码来源:copernicus.py
示例8: run
def run(port, max_height=223, max_width=123):
ser = Serial(port, 9600)
prev = time()
while True:
line = ser.readline().rstrip('\n\r')
data = line.split(',')
if len(data) != 3:
continue
for i in range(3):
try:
data[i] = calibration * float(data[i])
except ValueError:
continue
now = time()
rate = 1 / (now - prev)
ut = data[0]
ul = data[1]
ur = data[2]
width = 0
height = max_height - ut
if not (ul > 120 and ur > 120):
width = max_width - ul - ur
#print '{:5.2f} {:5.2f} {:5.2f} {:5.2f} {:4.2f}'.format(height, ul, ur, width, rate)
prev = now
开发者ID:banacer,项目名称:door-wiz,代码行数:25,代码来源:display_data.py
示例9: __init__
def __init__(self, gui=None):
if Printer.__single:
raise RuntimeError('A Printer already exists')
Printer.__single = self
if gui:
self.gui = gui
else:
self.gui = dddumbui.DumbGui()
Serial.__init__(self)
self.usbId = None
# Command sequence number [1..255]
self.lineNr = 1
# The last (max. 256) commands sent, for command re-send
self.lastCommands = {}
# Retry counter on rx and tx errors
self.rxErrors = 0
self.txErrors = 0
self.startTime = None
self.curDirBits = 0
开发者ID:ErwinRieger,项目名称:ddprint,代码行数:29,代码来源:ddprinter.py
示例10: main
def main(argv):
loglevels = [logging.CRITICAL, logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
parser = argparse.ArgumentParser(argv)
parser.add_argument('-v', '--verbosity', type=int, default=4,
help='set logging verbosity, 1=CRITICAL, 5=DEBUG')
parser.add_argument('tty',
help='Serial port device file name')
parser.add_argument('-b', '--baudrate', default=115200, type=int,
help='Serial port baudrate')
args = parser.parse_args()
# logging setup
logging.basicConfig(level=loglevels[args.verbosity-1])
# open serial port
try:
logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
port = Serial(args.tty, args.baudrate, dsrdtr=0, rtscts=0, timeout=0.3)
except IOError:
logging.critical("error opening serial port", file=sys.stderr)
sys.exit(2)
try:
app = rssi_plot()
app.plot_rssi(port)
except KeyboardInterrupt:
port.close()
sys.exit(2)
开发者ID:RIOT-OS,项目名称:applications,代码行数:27,代码来源:plot_rssi.py
示例11: __init__
def __init__(self, port, baudrate, wait_ready = True, timeout = True):
# si le port n'est pas utilisé
if port not in ARDUINO_PORTS:
raise InvalidPort(port)
# si le baudrate n'existe pas
if baudrate not in Serial.BAUDRATES:
raise InvalidBaudRate(baudrate)
# on appel le constructeur parent
Serial.__init__(self, port, baudrate, timeout = timeout)
# connection start : temps de début de connexion
self.connection_start = time()
# on dort un petit moment
if not isinstance(wait_ready, bool):
time.sleep(wait_ready)
# temps qu'on n'a pas de réponse on attend
elif True:
while not self.read():
continue
# temps nécéssaire pour obtenir la première connexion
self.connection_first_answer = time() - self.connection_start
print(self.connection_first_answer)
开发者ID:Kestrel67,项目名称:Python-Libraries,代码行数:28,代码来源:ArduinoInterface.py
示例12: __init__
def __init__(self, port, baudrate=115200, reset=False):
ConBase.__init__(self)
try:
self.serial = Serial(port, baudrate=baudrate, interCharTimeout=1)
if reset:
logging.info("Hard resetting device at port: %s" % port)
self.serial.setDTR(True)
time.sleep(0.25)
self.serial.setDTR(False)
self.serial.close()
self.serial = Serial(port, baudrate=baudrate, interCharTimeout=1)
while True:
time.sleep(2.0)
if not self.inWaiting():
break
self.serial.read(self.inWaiting())
except Exception as e:
logging.error(e)
raise ConError(e)
开发者ID:wendlers,项目名称:mpfshell,代码行数:25,代码来源:conserial.py
示例13: AcquisitionThread
class AcquisitionThread(Thread):
BAUDRATE = 57600
START_CHARACTER = b's'
PACKET_SIZE = 69
def __init__(self, acquisition_queue):
super(AcquisitionThread, self).__init__()
self.acquisition_queue = acquisition_queue
self.exit_flag = False
def run(self):
port_nb = serial_port()[0]
self.device = Serial(port_nb, baudrate=self.BAUDRATE, timeout=0.2)
while not self.exit_flag:
c = self.device.read(1)
if c == self.START_CHARACTER:
while self.device.inWaiting() < self.PACKET_SIZE:
pass
data = self.device.read(self.PACKET_SIZE)
rocket_data = RocketData(data)
# checksum_validated = rocket_data.validateCheckSum()
checksum_validated = True
if checksum_validated:
self.acquisition_queue.put(rocket_data)
self.device.close()
def stop(self):
self.exit_flag = True
开发者ID:MaitreMenard,项目名称:GaulAvionique,代码行数:31,代码来源:serial_reader.py
示例14: uart1
def uart1(portName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
ser = Serial(
data,
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1,
timeout=None)
while True:
line = ser.readline(ser.inWaiting()).decode('utf-8')[:-2]
if line:
t = line.split(",")
line2 = float(t[5])
if line2 > 0:
print(portName, line2)
if line == '520':
subprocess.call(["xte", "key Up"])
elif line == '620':
subprocess.call(["xte", "key Down"])
elif line == '110':
break
ser.close()
开发者ID:reyueei,项目名称:voltCtrlFLC,代码行数:26,代码来源:serialReading+-+Copy.py
示例15: Rubinstein
class Rubinstein(object):
def __init__(self, port='/dev/ttyUSB0', baudrate=250000, timeout=0.25):
self.serial = Serial(port=port, baudrate=baudrate, timeout=timeout)
# FIXME don't know why initialization stops with this line
while self.serial.readline() != 'echo:SD init fail\n':
sleep(0.2)
self.busy = False
def command(self, cmd, timeout=float('inf')):
# FIXME needs mutex!
if self.busy or self.serial.read():
raise RuntimeError("busy")
self.busy = True
self.serial.write("{}\n".format(cmd))
response = ''
start = time()
while time() - start < timeout:
sleep(0.2)
line = self.serial.readline()
if line == 'ok\n':
# FIXME missing try/finally for busy=False
self.busy = False
return response
if len(line) > 0:
print line,
response += line
else:
raise RuntimeError(
"timeout when waiting for command {}".format(cmd))
开发者ID:lumbric,项目名称:kossel_config,代码行数:29,代码来源:run_g29_tests.py
示例16: __init__
class GPS:
def __init__(self, port_name, baud_rate, logger):
self.logger = logger
self.logger.write("Starting GPS Communications")
self.serial = Serial(port_name, baud_rate)
self.lat = 0
self.long = 0
def get_GPS(self):
while (self.lat == 0.0):
self.logger.write("Waiting for GPS")
time.sleep(1)
return (self.lat, self.long)
def read_data(self):
if self.serial.inWaiting() > 0:
data = self.serial.readline().rstrip()
if data[0:6] == "$GPRMC":
fields = data.split(',')
if fields[2] == 'A':
self.lat = int(fields[3][0:2]) + (float(fields[3][2:]) / 60.0) #ASSUMES TWO DIGIT LATITUDE
self.long = int(fields[5][0:3]) + (float(fields[5][3:]) / 60.0) #ASSUMES THREE DIGIT LONGITUDE
else:
self.lat = 0.0
self.long = 0.0
def close(self):
self.serial.close()
开发者ID:petemarshall77,项目名称:RoboMagellan-2016,代码行数:30,代码来源:gps.py
示例17: __init__
def __init__(self, port=None, baudrate=None):
if port == None:
port = profile.getPreference("serial_port")
if baudrate == None:
baudrate = int(profile.getPreference("serial_baud"))
self.serial = None
if port == "AUTO":
programmer = stk500v2.Stk500v2()
for port in serialList():
try:
print "Connecting to: %s %i" % (port, baudrate)
programmer.connect(port)
programmer.close()
time.sleep(1)
self.serial = Serial(port, baudrate, timeout=2)
break
except ispBase.IspError as (e):
print "Error while connecting to %s %i" % (port, baudrate)
print e
pass
except:
print "Unexpected error while connecting to serial port:" + port, sys.exc_info()[0]
programmer.close()
elif port == "VIRTUAL":
self.serial = VirtualPrinter()
else:
try:
self.serial = Serial(port, baudrate, timeout=2)
except:
print "Unexpected error while connecting to serial port:" + port, sys.exc_info()[0]
print self.serial
开发者ID:greenarrow,项目名称:Cura,代码行数:31,代码来源:machineCom.py
示例18: SerialSensorReader
class SerialSensorReader(SensorReader):
def __init__(self,commConfig,sensorName,rowType=(),rateSec=60,db=None,log=None):
SensorReader.__init__(self,commConfig,sensorName,rowType,rateSec,db,log)
self.eol = "\n"
self.delimiterPattern = "\\S+"
self.delimiter = " "
def initSensor(self):
from serial import Serial
self.serialP = Serial(**self.commConfig)
self.resetSensor()
def stopSensor(self):
self.serialP.close()
def getReadings(self):
readings = []
ts = str((int(time())/self.rateSec)*self.rateSec)
while self.serialP.inWaiting():
line = readlineR(self.serialP,self.eol)
readings.append(self.delimiter.join([ts, line]))
readings = [x for x in readings if x != None]
return readings
def toRow(self, reading):
row = re.findall(self.delimiterPattern,reading.strip())
row = [x.strip() for x in row]
return row
def resetSensor(self):
print "Reseting Sensor:" + self.sensorName
return readlineR(self.serialP,self.eol)
开发者ID:sethfowler,项目名称:particlecamp,代码行数:27,代码来源:dataservice.py
示例19: SerialInstrument
class SerialInstrument(object):
def __init__(self, port, baud=19200):
self.port = port
self.baud = baud
def read(self, num=None):
if num:
while self.instr.inWaiting() < num:
pass
return self.instr.read(num)
while not self.instr.inWaiting():
pass
return self.instr.read()
def write(self, data):
self.instr.write(data)
def __enter__(self):
self.instr = Serial(self.port, self.baud)
self.instr.open()
return self
def __exit__(self, type, value, tb):
self.instr.close()
开发者ID:allevitan,项目名称:instruments,代码行数:25,代码来源:instruments.py
示例20: testSendAndReceive2Ports
def testSendAndReceive2Ports(self):
"""testSendAndReceive2Ports: Creates a PortLogger, serial objects that
connect to the PortLogger, and verifies that reads and writes work"""
from serial import Serial
from random import randint
plogger= PortLogger()
serials = []
for port_name in plogger.port_names():
new_serial = Serial(port_name)
new_serial.timeout = 1
serials.append(new_serial)
plogger.start()
message1 = "This is a test message"
message2 = ''.join([chr(randint(32,127)) for i in range(24)])
serials[0].write(message1)
recvd = serials[1].read(len(message1))
self.assertEqual(message1, recvd)
recvd = serials[0].read(len(message1))
self.assertEqual('', recvd)
serials[1].write(message2)
recvd = serials[0].read(len(message2))
self.assertEqual(message2, recvd)
recvd = serials[1].read(len(message2))
self.assertEqual('', recvd)
plogger.stop()
开发者ID:madhulika12,项目名称:Virtual-Industrial-Control-System-Testbed-Developement,代码行数:30,代码来源:portlogger.py
注:本文中的serial.Serial类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论