• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python serial.Serial类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中serial.Serial的典型用法代码示例。如果您正苦于以下问题:Python Serial类的具体用法?Python Serial怎么用?Python Serial使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Serial类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: read_serial

def read_serial(ser: serial.Serial, q_out: queue.Queue, q_in: queue.Queue,
                stop_event: threading.Event):
    while (ser.isOpen()):
        if stop_event.is_set(): return
        if not q_in.empty():
            ser.write(q_in.get() + b'\n')
        q_out.put(bytestostr(ser.readline()))
开发者ID:MrLeeh,项目名称:jsonwatch,代码行数:7,代码来源:watcher.py


示例2: HashTagDisplay

class HashTagDisplay():
    def __init__(self, delay=5, debug=False):
        # duration in seconds to allow human to read display
        self.delay = delay
        # print messages to shell for debugging 
        self.debug = debug
        self.ser = Serial('/dev/ttyS0', baudrate=9600, timeout=1)

    def search(self, hashtag):
           # search for tweets with specified hashtag
           twitter_search = Twitter(domain="search.twitter.com")
           return twitter_search.search(q=hashtag)
    
    def display(self, results):
           # Display each tweet in the twitter search results
           for tweet in results.get('results'):
               msg = "@" + tweet.get('from_user') + ": " + tweet.get('text')
               #http://stackoverflow.com/questions/8689795/python-remove-non-ascii-characters-but-leave-periods-and-spaces
               msg = filter(lambda x: x in string.printable, msg)
               msg = re.sub('\s+',' ', msg)
               if self.debug == True:
                   print "msg: [" + msg + "]"
               for c in msg:
                   self.ser.write(c)
                   sleep(0.01)
               self.ser.write('\n');
               sleep(self.delay)
开发者ID:pdp7,项目名称:tweetypi,代码行数:27,代码来源:alatweet.py


示例3: run_host_test

 def run_host_test(self, target_name, port,
                   duration, verbose=False):
     """
     Functions resets target and grabs by timeouted pooling test log
     via serial port.
     Function assumes target is already flashed with proper 'test' binary.
     """
     output = ""
     # Prepare serial for receiving data from target
     baud = 9600
     serial = Serial(port, timeout=1)
     serial.setBaudrate(baud)
     flush_serial(serial)
     # Resetting target and pooling
     reset(target_name, serial, verbose=verbose)
     start_serial_timeour = time()
     try:
         while (time() - start_serial_timeour) < duration:
             test_output = serial.read(512)
             output += test_output
             flush_serial(serial)
             if '{end}' in output:
                 break
     except KeyboardInterrupt, _:
         print "CTRL+C break"
开发者ID:dreschpe,项目名称:mbed,代码行数:25,代码来源:singletest.py


示例4: testSendAndReceive4PortsLeader

    def testSendAndReceive4PortsLeader(self):
        """testSendAndReceive4Ports: Creates a PortLogger, serial objects that 
            connect to the PortLogger, and verifies that reads and writes work"""

        plogger = PortLogger(number_of_ports=4, rs485=True)
        serials = []
        for port_name in plogger.port_names():
            new_serial = Serial(port_name)
            new_serial.timeout = 1
            serials.append(new_serial)

        leader = serials[0]
        followers = serials[1:]

        self.assertEquals(len(followers), len(serials)-1)
        self.assertTrue(leader) #Check that leader is not None or []

        plogger.start()
        for port in followers:
            message = ''.join([chr(randint(32,127)) for _ in range(24)])
            port.write(message)
            for port2 in followers:
                recvd = port2.read(len(message))
                self.assertEqual('', recvd)
            recvd = leader.read(len(message))
            self.assertEquals(recvd, message)
        
        message = ''.join([chr(randint(32,127)) for _ in range(24)])
        leader.write(message)
        for port in followers:
            recvd = port.read(len(message))
            self.assertEquals(recvd, message)

        plogger.stop()
开发者ID:thrawn117,项目名称:mastertestbed,代码行数:34,代码来源:portlogger.py


示例5: available_ports

    def available_ports():
        usb_serial_ports = filter(
            (lambda x: x.startswith('ttyUSB')),
            os.listdir('/dev'))

        ports = []
        for p in usb_serial_ports:
            ports.append(serial_for_url('/dev/'+p, do_not_open=True))


        """
        also check for old school serial ports, my hope is that this will enable
        both USB serial adapters and standard serial ports

        Scans COM1 through COM255 for available serial ports

        returns a list of available ports
        """
        for i in range(256):
            try:
                p = Serial(i)
                p.close()
                ports.append(p)
            except SerialException:
                pass

        return ports
开发者ID:ccraddock,项目名称:pyxid,代码行数:27,代码来源:serial_wrapper.py


示例6: __init__

class Connexion:
    """
    Manage the USB connexion.
    """

    def __init__(self, tty = "/dev/ttyACM0"):
        """
        Open a serial connexion on specified tty, with a 1152000
        baudrate.
        """
        try:
            self.serial = Serial(tty, baudrate = 115200)
        except SerialException:
            print("Can't open", tty)
            exit(1)

    def read(self):
        """ Read a line. """
        try:
            return self.serial.readline().decode("ascii", "ignore")
        except OSError:
            sleep(1)
            return self.read()

    def write(self, msg):
        """ Send a string. """
        self.serial.write(normalize("NFD", msg).encode("ascii", "ignore"))
开发者ID:mattwilliamson,项目名称:WaDeD,代码行数:27,代码来源:connexion.py


示例7: Copernicus

class Copernicus(object):

    def __init__(self, port=None, baudrate=9600):
        self.serial = Serial(port, baudrate)

    def read(self):
        return ord(self.serial.read(1))

    def write(self, value):
        return self.serial.write(chr(value))

    def flush(self):
        self.serial.flushInput()

    def set_autoupdates(self, peripheral):
        self.write(autoupdate_codes[peripheral])

    def query(self, peripheral):
        self.write(query_codes[peripheral])

    def set_dashboard_angle(self, angle):
        if 0 <= angle <= 31:
            self.write(angle)
        else:
            ValueError("Dashboard angle out of range (0-31)")

    def reset_dashboard_angle(self):
        self.set_dashboard_angle(0)

    def led_on(self):
        self.write(33)

    def led_off(self):
        self.write(32)
开发者ID:bzurkowski,项目名称:morse-transmitter,代码行数:34,代码来源:copernicus.py


示例8: run

def run(port, max_height=223, max_width=123):
    ser = Serial(port, 9600)
    prev = time()
    while True:
        line = ser.readline().rstrip('\n\r')
        data = line.split(',')

        if len(data) != 3:
            continue
        for i in range(3):
            try:
                data[i] = calibration * float(data[i])
            except ValueError:
                continue
        now = time()
        rate = 1 / (now - prev)
        ut = data[0]
        ul = data[1]
        ur = data[2]
        width = 0
        height = max_height - ut
        if not (ul > 120 and ur > 120):
            width = max_width - ul - ur
        #print '{:5.2f} {:5.2f} {:5.2f} {:5.2f} {:4.2f}'.format(height, ul, ur, width, rate)
        prev = now
开发者ID:banacer,项目名称:door-wiz,代码行数:25,代码来源:display_data.py


示例9: __init__

    def __init__(self, gui=None):

        if Printer.__single:
            raise RuntimeError('A Printer already exists')

        Printer.__single = self

        if gui:
            self.gui = gui
        else:
            self.gui = dddumbui.DumbGui()

        Serial.__init__(self)

        self.usbId = None

        # Command sequence number [1..255]
        self.lineNr = 1

        # The last (max. 256) commands sent, for command re-send
        self.lastCommands = {}

        # Retry counter on rx and tx errors
        self.rxErrors = 0
        self.txErrors = 0

        self.startTime = None

        self.curDirBits = 0
开发者ID:ErwinRieger,项目名称:ddprint,代码行数:29,代码来源:ddprinter.py


示例10: main

def main(argv):
    loglevels = [logging.CRITICAL, logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
    parser = argparse.ArgumentParser(argv)
    parser.add_argument('-v', '--verbosity', type=int, default=4,
                        help='set logging verbosity, 1=CRITICAL, 5=DEBUG')
    parser.add_argument('tty',
                        help='Serial port device file name')
    parser.add_argument('-b', '--baudrate', default=115200, type=int,
                        help='Serial port baudrate')
    args = parser.parse_args()
    # logging setup
    logging.basicConfig(level=loglevels[args.verbosity-1])

    # open serial port
    try:
        logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
        port = Serial(args.tty, args.baudrate, dsrdtr=0, rtscts=0, timeout=0.3)
    except IOError:
        logging.critical("error opening serial port", file=sys.stderr)
        sys.exit(2)

    try:
        app = rssi_plot()
        app.plot_rssi(port)
    except KeyboardInterrupt:
        port.close()
        sys.exit(2)
开发者ID:RIOT-OS,项目名称:applications,代码行数:27,代码来源:plot_rssi.py


示例11: __init__

  def __init__(self, port, baudrate, wait_ready = True, timeout = True):
    
    # si le port n'est pas utilisé
    if port not in ARDUINO_PORTS:
      raise InvalidPort(port)
      
    # si le baudrate n'existe pas
    if baudrate not in Serial.BAUDRATES:
      raise InvalidBaudRate(baudrate)
    
    # on appel le constructeur parent
    Serial.__init__(self, port, baudrate, timeout = timeout)
    
    # connection start : temps de début de connexion
    self.connection_start = time()

    # on dort un petit moment
    if not isinstance(wait_ready, bool):
      time.sleep(wait_ready)
     
    # temps qu'on n'a pas de réponse on attend
    elif True:
      while not self.read():
        continue
       
      # temps nécéssaire pour obtenir la première connexion
      self.connection_first_answer = time() - self.connection_start
      print(self.connection_first_answer)
开发者ID:Kestrel67,项目名称:Python-Libraries,代码行数:28,代码来源:ArduinoInterface.py


示例12: __init__

    def __init__(self, port, baudrate=115200, reset=False):
        ConBase.__init__(self)

        try:
            self.serial = Serial(port, baudrate=baudrate, interCharTimeout=1)

            if reset:
                logging.info("Hard resetting device at port: %s" % port)

                self.serial.setDTR(True)
                time.sleep(0.25)
                self.serial.setDTR(False)

                self.serial.close()
                self.serial = Serial(port, baudrate=baudrate, interCharTimeout=1)

                while True:
                    time.sleep(2.0)
                    if not self.inWaiting():
                        break
                    self.serial.read(self.inWaiting())

        except Exception as e:
            logging.error(e)
            raise ConError(e)
开发者ID:wendlers,项目名称:mpfshell,代码行数:25,代码来源:conserial.py


示例13: AcquisitionThread

class AcquisitionThread(Thread):
    BAUDRATE = 57600
    START_CHARACTER = b's'
    PACKET_SIZE = 69

    def __init__(self, acquisition_queue):
        super(AcquisitionThread, self).__init__()
        self.acquisition_queue = acquisition_queue
        self.exit_flag = False

    def run(self):
        port_nb = serial_port()[0]
        self.device = Serial(port_nb, baudrate=self.BAUDRATE, timeout=0.2)
        while not self.exit_flag:
            c = self.device.read(1)
            if c == self.START_CHARACTER:
                while self.device.inWaiting() < self.PACKET_SIZE:
                    pass
                data = self.device.read(self.PACKET_SIZE)

                rocket_data = RocketData(data)
                # checksum_validated = rocket_data.validateCheckSum()
                checksum_validated = True
                if checksum_validated:
                    self.acquisition_queue.put(rocket_data)
        self.device.close()



    def stop(self):
        self.exit_flag = True
开发者ID:MaitreMenard,项目名称:GaulAvionique,代码行数:31,代码来源:serial_reader.py


示例14: uart1

def uart1(portName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            ser = Serial(
                data,
                baudrate=9600,
                bytesize=8,
                parity='N',
                stopbits=1,
                timeout=None)
            while True:
                line = ser.readline(ser.inWaiting()).decode('utf-8')[:-2]
                if line:
                    t = line.split(",")
                    line2 = float(t[5])
                    if line2 > 0:
                        print(portName, line2)
                        if line == '520':
                            subprocess.call(["xte", "key Up"])
                        elif line == '620':
                            subprocess.call(["xte", "key Down"])
                        elif line == '110':
                            break
            ser.close()
开发者ID:reyueei,项目名称:voltCtrlFLC,代码行数:26,代码来源:serialReading+-+Copy.py


示例15: Rubinstein

class Rubinstein(object):
    def __init__(self, port='/dev/ttyUSB0', baudrate=250000, timeout=0.25):
        self.serial = Serial(port=port, baudrate=baudrate, timeout=timeout)
        # FIXME don't know why initialization stops with this line
        while self.serial.readline() != 'echo:SD init fail\n':
            sleep(0.2)
        self.busy = False

    def command(self, cmd, timeout=float('inf')):
        # FIXME needs mutex!
        if self.busy or self.serial.read():
            raise RuntimeError("busy")
        self.busy = True
        self.serial.write("{}\n".format(cmd))
        response = ''
        start = time()
        while time() - start < timeout:
            sleep(0.2)
            line = self.serial.readline()
            if line == 'ok\n':
                # FIXME missing try/finally for busy=False
                self.busy = False
                return response
            if len(line) > 0:
                print line,
            response += line
        else:
            raise RuntimeError(
                "timeout when waiting for command {}".format(cmd))
开发者ID:lumbric,项目名称:kossel_config,代码行数:29,代码来源:run_g29_tests.py


示例16: __init__

class GPS:

    def __init__(self, port_name, baud_rate, logger):
        self.logger = logger
        self.logger.write("Starting GPS Communications")
        self.serial = Serial(port_name, baud_rate)

        self.lat = 0
        self.long = 0

    def get_GPS(self):
        while (self.lat == 0.0):
            self.logger.write("Waiting for GPS")
            time.sleep(1)
        return (self.lat, self.long)

    def read_data(self):
        if self.serial.inWaiting() > 0:
            data = self.serial.readline().rstrip()
            if data[0:6] == "$GPRMC":
                fields = data.split(',')
                if fields[2] == 'A':
                  self.lat = int(fields[3][0:2]) + (float(fields[3][2:]) / 60.0) #ASSUMES TWO DIGIT LATITUDE
                  self.long = int(fields[5][0:3]) + (float(fields[5][3:]) / 60.0) #ASSUMES THREE DIGIT LONGITUDE
                else:
                  self.lat = 0.0
                  self.long = 0.0

    def close(self):
        self.serial.close()
开发者ID:petemarshall77,项目名称:RoboMagellan-2016,代码行数:30,代码来源:gps.py


示例17: __init__

 def __init__(self, port=None, baudrate=None):
     if port == None:
         port = profile.getPreference("serial_port")
     if baudrate == None:
         baudrate = int(profile.getPreference("serial_baud"))
     self.serial = None
     if port == "AUTO":
         programmer = stk500v2.Stk500v2()
         for port in serialList():
             try:
                 print "Connecting to: %s %i" % (port, baudrate)
                 programmer.connect(port)
                 programmer.close()
                 time.sleep(1)
                 self.serial = Serial(port, baudrate, timeout=2)
                 break
             except ispBase.IspError as (e):
                 print "Error while connecting to %s %i" % (port, baudrate)
                 print e
                 pass
             except:
                 print "Unexpected error while connecting to serial port:" + port, sys.exc_info()[0]
         programmer.close()
     elif port == "VIRTUAL":
         self.serial = VirtualPrinter()
     else:
         try:
             self.serial = Serial(port, baudrate, timeout=2)
         except:
             print "Unexpected error while connecting to serial port:" + port, sys.exc_info()[0]
     print self.serial
开发者ID:greenarrow,项目名称:Cura,代码行数:31,代码来源:machineCom.py


示例18: SerialSensorReader

class SerialSensorReader(SensorReader):
	def __init__(self,commConfig,sensorName,rowType=(),rateSec=60,db=None,log=None):
		SensorReader.__init__(self,commConfig,sensorName,rowType,rateSec,db,log)
		self.eol = "\n"
		self.delimiterPattern = "\\S+"
		self.delimiter = " "
	def initSensor(self):
		from serial import Serial
		self.serialP = Serial(**self.commConfig) 
		self.resetSensor()
	def stopSensor(self):
		self.serialP.close()
	def getReadings(self):
		readings = []
		ts = str((int(time())/self.rateSec)*self.rateSec)
		while self.serialP.inWaiting():
			line = readlineR(self.serialP,self.eol)
			readings.append(self.delimiter.join([ts, line]))
		readings = [x for x in readings if x != None]
		return readings
	def toRow(self, reading):
		row = re.findall(self.delimiterPattern,reading.strip())
		row = [x.strip() for x in row]
		return row
	def resetSensor(self):
		print "Reseting Sensor:" + self.sensorName
		return readlineR(self.serialP,self.eol)
开发者ID:sethfowler,项目名称:particlecamp,代码行数:27,代码来源:dataservice.py


示例19: SerialInstrument

class SerialInstrument(object):

    def __init__(self, port, baud=19200):
        self.port = port
        self.baud = baud

    def read(self, num=None):
        if num:
            while self.instr.inWaiting() < num:
                pass
            return self.instr.read(num)
        while not self.instr.inWaiting():
            pass
        return self.instr.read()
        
    def write(self, data):
        self.instr.write(data)

    def __enter__(self):
        self.instr = Serial(self.port, self.baud)
        self.instr.open()
        return self

    def __exit__(self, type, value, tb):
        self.instr.close()
开发者ID:allevitan,项目名称:instruments,代码行数:25,代码来源:instruments.py


示例20: testSendAndReceive2Ports

    def testSendAndReceive2Ports(self):
        """testSendAndReceive2Ports: Creates a PortLogger, serial objects that 
            connect to the PortLogger, and verifies that reads and writes work"""
        from serial import Serial
        from random import randint
        plogger= PortLogger()
        serials = []
        for port_name in plogger.port_names():
            new_serial = Serial(port_name)
            new_serial.timeout = 1
            serials.append(new_serial)

        plogger.start()

        message1 = "This is a test message"
        message2 = ''.join([chr(randint(32,127)) for i in range(24)])
        serials[0].write(message1)
        recvd = serials[1].read(len(message1))
        self.assertEqual(message1, recvd)
        recvd = serials[0].read(len(message1))
        self.assertEqual('', recvd)
        

        serials[1].write(message2)
        recvd = serials[0].read(len(message2))
        self.assertEqual(message2, recvd)
        recvd = serials[1].read(len(message2))
        self.assertEqual('', recvd)
        
        plogger.stop()
开发者ID:madhulika12,项目名称:Virtual-Industrial-Control-System-Testbed-Developement,代码行数:30,代码来源:portlogger.py



注:本文中的serial.Serial类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python list_ports.comports函数代码示例发布时间:2022-05-27
下一篇:
Python serial.write函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap