• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python xbee.XBee类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中xbee.XBee的典型用法代码示例。如果您正苦于以下问题:Python XBee类的具体用法?Python XBee怎么用?Python XBee使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了XBee类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main():

    try:
        ser = serial.Serial('/dev/ttyUSB0', 38400)
        xbee = XBee(ser)
        print 'xbee created/initialized'

        with open('flight_data.csv', 'w') as outfile:
            outfile.write(','.join(headers))
            outfile.write('\n')

        try:
            while True:
                packet = xbee.wait_read_frame()
                data = struct.unpack('qddfffffffffhhhcx', packet['rf_data'])
                with open('flight_data.csv', 'a+w') as outfile:
                    outfile.write(','.join([str(i) for i in data]))
                    outfile.write('\n')

        except KeyboardInterrupt:
            print 'Process stopped by user'

        finally:
            xbee = None
            ser.close()

    except serial.SerialException:
        print 'failed to initialize serial connection'
开发者ID:yingted,项目名称:read_xbee,代码行数:28,代码来源:read_xbee.py


示例2: main

def main():
    ser = serial.Serial(SERIALPORT, BAUDRATE, timeout=60)
    xbee = XBee(ser)

    # Receieve one data frame from the xbee
    xb = xbee.wait_read_frame()

    # Record the time for the reading
    logtime = datetime.datetime.now()

    # Split out the xbee data packet into variables that are easy to work with
    #xid = xb['id']
    #options = int(binascii.hexlify(xb['options']), 16)
    rssi = -1 * int(binascii.hexlify(xb['rssi']), 16)
    #source_addr = int(binascii.hexlify(xb['source_addr']), 16)
    data = xb['rf_data']
    pressure = struct.unpack('f', data[0:4])[0]
    temperature = struct.unpack('f', data[4:8])[0]

    # Post to database
    con = mdb.connect(host='localhost', db='monitor', user='crblackw')

    with con:
        cur = con.cursor()
        cur.execute("INSERT INTO "
                    "outdoor (datetime, rssi, temperature, pressure) "
                    "VALUES (%s, %s, %s, %s)",
                    (logtime, rssi, temperature, pressure))

    con.close()
开发者ID:crblackw,项目名称:home_monitor,代码行数:30,代码来源:query.py


示例3: main

def main():
    """
    Sends an API AT command to read the lower-order address bits from
    an XBee Series 1 and looks for a response
    """
    try:

        # Open serial port
        ser = serial.Serial('/dev/cu.usbserial-DA00T2BX', 9600)

        # Create XBee Series 1 object
        xbee = XBee(ser)


        # Send AT packet
        xbee.send('at', frame_id='A', command='DH')

        # Wait for response
        response = xbee.wait_read_frame()
        print response


        # Wait for response
        response = xbee.wait_read_frame()
        print response
    except KeyboardInterrupt:
        pass
    finally:
        ser.close()
开发者ID:kamedono,项目名称:raspi-camera-research,代码行数:29,代码来源:serial_sample.py


示例4: BasestationStream

class BasestationStream(threading.Thread):
  def __init__(self, port='COM1', baudrate=230400, addr=0x2071, sinks=None, autoStart=True):
    
    threading.Thread.__init__(self)
    self.daemon = True
    
    self.robots = {}
    
    try:
      self.ser = serial.Serial(port, baudrate, timeout=3, rtscts=0)
      self.xb = XBee(self.ser)
    except serial.serialutil.SerialException:
      print "Could not open serial port:%s" % port
      self.xb = None
    
    self.dispatcher=AsynchDispatch(sinks=sinks,
      callbacks={'packet':[self.send]})
      
    self.addr = addr
        
    if autoStart:
        self.start()
        
  def run(self):
    if self.xb is not None:
      while(True):
        data = self.xb.wait_read_frame()
        self.receive_callback(data)
        
  def exit(self):
    if self.xb is not None:
      self.xb.halt()
      self.ser.close()
      self.xb = None
    
  def put(self,message):
    self.dispatcher.put(message)
    
  def receive_callback(self,xbee_data):
    self.last_time = time.time()
    pkt = Packet(dest_addr=self.addr, time=self.last_time,
      payload=xbee_data.get('rf_data'))
      
    source_addr = unpack('>h',xbee_data.get('source_addr'))
    source_addr = source_addr[0]
    
    if source_addr in self.dispatcher.sinks.keys():
      self.dispatcher.dispatch((source_addr,pkt))
    else:
      self.dispatcher.dispatch(('packet',pkt))
    
  def send(self,message):
    if self.xb is not None:
      pkt = message.data
      self.xb.tx(dest_addr = pack('>h',pkt.dest_addr), data = pkt.payload)
  
  def register_robot(self,robot,addr):
    self.dispatcher.add_sinks({addr:[robot.put]})
开发者ID:abuchan,项目名称:imageproc_py,代码行数:58,代码来源:basestation_stream.py


示例5: device

class Communicator:
    """Main communication class handles serial connection with the XBee module.

    Arguments:
        device -- Path to serial device (default: /dev/tty01)
        baudrate -- Serial baudrate (default: 9600)

    All received packet data is placed into the rx_queue.
    Transmission is handled by _packet_sender function.
    """
    def __init__(self, device='/dev/ttyO1', baudrate=9600):
        self.device    = device
        self.baudrate  = baudrate
        self.kill      = False

        self.rx_queue  = Queue()
        self.tx_queue  = Queue()

        self.serial    = Serial(self.device, self.baudrate)
        self.xbee      = XBee(self.serial, callback=self._packet_handler)

        self.tx_thread = Thread(target=self._packet_sender, args=())
        self.tx_thread.daemon = True
        self.tx_thread.start()

    def _packet_handler(self, data):
        """Internal serial callback adds data to rx_queue. No return."""
        self.rx_queue.put(data)

    def _packet_sender(self):
        """Internal threaded packet transmission loop.

        Reads outgoing packets from tx_queue and sends them over serial.
        """
        while True:
            if self.kill:
                exit(1)
            packet = self.tx_queue.get()
            self.tx_queue.task_done()

            if len(packet) > 100:
                WARN("Trying to send too large of a packet: %d" % len(packet))
                continue
            self.xbee.tx(dest_addr=packet[:2], data=packet[2:])

    def send(self, data):
        """Add data packet to transmission queue. No return.

        Packet data must be properly formated.
        [2 byte dst_addr][payload]
        """
        self.tx_queue.put(data)

    def close(self):
        """Cleanup communication pipes and threads. No return."""
        self.xbee.halt()
        self.serial.close()
        self.kill = True
开发者ID:jorik041,项目名称:Andrena,代码行数:58,代码来源:base.py


示例6: get_xbee

def get_xbee(port):
    def print_data(data):
        print "%s: %s" % (port, repr(data))

    serial_port = serial.Serial(XBEES[port]['port'], 9600)
    xbee = XBee(serial_port)

    xbee.id = XBEES[port]['id']
    return (xbee, serial_port)
开发者ID:IsidreSole,项目名称:xbeemap,代码行数:9,代码来源:get_rssi_map.py


示例7: BaseStation

class BaseStation(object):
    def __init__(self, port, baud, dest_addr=None, call_back=None):
        try:
            self.ser = Serial(port, baud, timeout=1)
            if self.ser.isOpen():
                if call_back == None:
                    self.xb = XBee(self.ser)
                else:
                    self.xb = XBee(self.ser, callback=call_back)
            else:
                raise SerialException("")
        except (AttributeError, SerialException):
            print "Unable to open a connection to the target. Please" + "  verify your basestation is enabled and properly configured."
            raise

        self.ser.writeTimeout = 5
        self.dest_addr = dest_addr

    def close(self):
        try:
            self.xb.halt()
            self.ser.close()
        except (AttributeError, SerialException):
            print "Serial Exception"
            raise

    def send(self, status, type, data):
        pld = Payload("".join(data), status, type)
        self.xb.tx(dest_addr=self.dest_addr, data=str(pld))

    def write(self, data):
        status = 0x00
        type = 0x00
        data_length = len(data)
        start = 0

        while data_length > 0:
            if data_length > 80:
                self.send(status, type, data[start : start + 80])
                data_length -= 80
                start += 80
            else:
                self.send(status, type, data[start : len(data)])
                data_length = 0
            time.sleep(0.05)

    def read(self):
        packet = self.xb.wait_read_frame()
        pld = Payload(packet.get("rf_data"))
        # rssi = ord(packet.get('rssi'))
        # (src_addr, ) = unpack('H', packet.get('source_addr'))
        # id = packet.get('id')
        # options = ord(packet.get('options'))
        status = pld.status
        type = pld.type
        data = pld.data
        return data
开发者ID:TPWT,项目名称:dynaroach,代码行数:57,代码来源:basestation.py


示例8: BaseStation

class BaseStation(object):

    def __init__(self, port, baud, dest_addr = None, call_back = None):
        self.ser = Serial(port, baud, timeout = 1)
        self.ser.writeTimeout = 5

        if call_back == None:
            self.xb = XBee(self.ser)
        else:
            self.xb = XBee(self.ser, callback = call_back)
        
        self.dest_addr = dest_addr

    def close(self):
        try:
            self.xb.halt()
            self.ser.close()
        except SerialException:
            print "SerialException"


    def send(self, status, type, data ):
        pld = Payload( ''.join(data), status, type )
        self.xb.tx(dest_addr = self.dest_addr, data = str(pld))

    def write(self, data):
        status = 0x00
        type = 0x00
        data_length = len(data)
        start = 0
        

        while(data_length > 0):
            if data_length > 80:
                self.send( status, type, data[start:start+80] )
                data_length -= 80
                start += 80
            else:
                self.send( status, type, data[start:len(data)] )
                data_length = 0
            time.sleep(0.05)
            

    def read(self):
        packet = self.xb.wait_read_frame()
    
        pld = Payload(packet.get('rf_data'))
        #rssi = ord(packet.get('rssi'))
        #(src_addr, ) = unpack('H', packet.get('source_addr'))
        #id = packet.get('id')
        #options = ord(packet.get('options'))
        
        status = pld.status
        type = pld.type
        data = pld.data
   
        return data
开发者ID:Humhu,项目名称:bootloader,代码行数:57,代码来源:basestation.py


示例9: main

def main():
    ser = serial.Serial('', 9600)
    xbee = XBee(ser)
    while True:
        try:
            response = xbee.wait_read_frame()
            print response
        except KeyboardInterrupt:
            break
    ser.close()
开发者ID:planset,项目名称:samples,代码行数:10,代码来源:a.py


示例10: __init__

    def __init__(self, port, baud, dest_addr=None, call_back=None):
        self.ser = Serial(port, baud, timeout=1)
        self.ser.writeTimeout = 5

        if call_back == None:
            self.xb = XBee(self.ser)
        else:
            self.xb = XBee(self.ser, callback=call_back)

        self.dest_addr = dest_addr
开发者ID:biomimetics,项目名称:imageproc_py,代码行数:10,代码来源:basestation.py


示例11: _send_command

    def _send_command(self, data):
	ser = Serial(SERIAL_PORT, BAUD_RATE)
	xbee = XBee(ser)
	try:
	    xbee.tx(dest_addr=PEGGY_ADDRESS, data=data)
	    ser.close()
	    return True
	except:
	    ser.close()
	    return False
开发者ID:pierreca,项目名称:Tyler,代码行数:10,代码来源:peggy.py


示例12: loop

def loop():

    global xb, telem, coord

    DEFAULT_COM_PORT = 'COM7'
    DEFAULT_BAUD_RATE = 57600
    DEFAULT_ADDRESS = '\x10\x21'
    DEFAULT_PAN = 0x1001
    
    if len(sys.argv) == 1:
        com = DEFAULT_COM_PORT
        baud = DEFAULT_BAUD_RATE
        addr = DEFAULT_ADDRESS
    elif len(sys.argv) == 4:
        com = sys.argv[1]
        baud = int(sys.argv[2])
        addr = pack('>H', int(sys.argv[3], 16))
    else:
        print "Wrong number of arguments. Must be: COM BAUD ADDR"
        sys.exit(1)
    
    ser = Serial(port = com, baudrate = baud) 
    xb = XBee(ser, callback = rxCallback)
    print "Setting PAN ID to " + hex(DEFAULT_PAN)
    xb.at(command = 'ID', parameter = pack('>H', DEFAULT_PAN))                 
    
    comm = CommandInterface(addr, txCallback)
    telem = TelemetryReader(addr, txCallback)
    kbint = KeyboardInterface(comm)
    coord = NetworkCoordinator(txCallback)
    
    comm.enableDebug()
    telem.setConsoleMode(True)
    telem.setFileMode(True)
    telem.writeHeader()
    coord.resume()
    
    comm.setSlewLimit(3.0)
    
    while True:

        try:
            c = None
            if( msvcrt.kbhit() ):
               c = msvcrt.getch()
            kbint.process(c)
            time.sleep(0.01)
            #comm.sendPing()

        except:        
            break
            
    telem.close()                
    xb.halt()
    ser.close()
开发者ID:Humhu,项目名称:python-lib,代码行数:55,代码来源:keyboardWire.py


示例13: xbee_ping

def xbee_ping():
	port = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)
	xbee = XBee(port, callback=handle_message)
	msg = '0\n'
	drone_addrs = [
		'\x00\x02',
		'\x00\x03'
	]
	while True:
		for addr in drone_addrs:
			xbee.tx(dest_addr=addr, data=msg)
		print 'Sent {0} to drones {1}'.format(msg, drone_addrs)
		time.sleep(1)
开发者ID:kalail,项目名称:queen,代码行数:13,代码来源:__init__.py


示例14: main

def main():
    """
    Sends an API AT command to read the lower-order address bits from 
    an XBee Series 1 and looks for a response
    """
    ser = serial.Serial('/dev/ttyUSB0', 57600)
    xbee = XBee(ser)
    rid = zc_id.get_id()
    rid = rid.split("/",1)[1] 
    xbee.at(frame='A', command='MY', parameter='\x20'+chr(int(rid)))
    xbee.at(frame='B', command='CH', parameter='\x0e')
    xbee.at(frame='C', command='ID', parameter='\x99\x99')
    f = open("data.csv","w")    
    try:
        i = 0
        while(1):
            response = xbee.wait_read_frame()
            print response
            lastRSSI = ord(response.get('rssi'))
            lastAddr = response.get('source_addr')
            print "RSSI = -%d dBm @ %d at index %d" % (lastRSSI,ord(lastAddr[1]), i)
            data = str(i) + ", -" + str(lastRSSI) +"\n"
            f.write(data)
            i = i+1
    except KeyboardInterrupt:
        pass
    finally:
        f.close()
        ser.close()
开发者ID:jlamyi,项目名称:zumy,代码行数:29,代码来源:receiver.py


示例15: main

def main(argv):

	#this is for all command line argument parsing
	SerialNumber = ''
	Command = ''
	Parameter = ''

	try:
		opts, args = getopt.getopt(argv,"s:c:p:",["SerialNumber=","Command=","Parameter="])

	except getopt.GetoptError:
		print 'argtest.py -s SerialNumber -c Command -p Parameter'
		sys.exit(2)

	for opt, arg in opts:
		if opt == '-h':
			print 'test.py -s <SerialNumber> -c <Command> -p <Parameter>'
			sys.exit()
		elif opt in ("-s", "SerialNumber"):
			SerialNumber = arg
		elif opt in ("-c", "Command"):
			Command = arg
		elif opt in ("-p", "Parameter"):
			Parameter = arg

	print 'Serial Number is:', SerialNumber
	print 'Command is:', Command
	print 'Parameter is:', Parameter

	#XBee Transmit Part
	PORT = '/dev/ttyAMA0'
	BAUD = 9600

	print 'start'

	ser = Serial(PORT, BAUD)
	print '1'
	xbee = XBee(ser)
	print '2'
	# Sends remote command from coordinator to the serial number, this only returns the value. In order to change
	#the value must add a parameter='XX'
	xbee.at(frame_id='A',command=Command, parameter=Parameter.decode('hex'))
	print '3'
	# Wait for and get the response
	frame = xbee.wait_read_frame()
	
	print '4'
	print frame
	ser.close()
开发者ID:henrib128,项目名称:uvic-home-auto,代码行数:49,代码来源:txco.py


示例16: connect

 def connect(self):
     """
     Creates an Xbee instance
     """
     try:
         self.log(logging.INFO, "Connecting to Xbee")
         if self.radio_type == 'IEEE':
             self.xbee = XBee(self.serial, callback=self.process, escaped=True)
         elif self.radio_type == 'DigiMesh':
             self.xbee = DigiMesh(self.serial, callback = self.process, escaped = True)
         else:
             self.xbee = XBee(self.serial, callback=self.process, escaped=True)
     except:
         return False
     return True
开发者ID:CasyWang,项目名称:CloudPan,代码行数:15,代码来源:xbee_wrapper.py


示例17: getSensorReading

def getSensorReading(serialPort):
    """
    Read data from sensor.

    serial_port: A string with the port number where the xbee reciever is connected
    for windows system is something like 'COM3','COM4'.. etc
    for Linux/Unix systems should be something lik '/dev/ttyUSB0', '/dev/ttyUSB1' ...etc

    """
    ser = serial.Serial(serialPort, 9600)  # Serial port to which the reciever is connected
    xbee = XBee(ser)  # Xbee object
    response = xbee.wait_read_frame()  # get reading
    response["timestamp"] = datetime.now()
    ser.close()
    return response
开发者ID:kapilsinha,项目名称:Summer2015,代码行数:15,代码来源:graph_data_wdb.py


示例18: __init__

	def __init__(self, callback=None, read_timeout=None, write_timeout=None):
		self.port = serial.Serial('/dev/ttyUSB0', 9600, timeout=read_timeout, writeTimeout=write_timeout)
		if callback:
			self.xbee = XBee(self.port, callback=callback, escaped=False)
			self.api = True
		else:
			self.api = False
开发者ID:kalail,项目名称:queen,代码行数:7,代码来源:link.py


示例19: XBeeDataPublisher

class XBeeDataPublisher(object):
    __metaclass__ = ABCMeta  # ABstract Class

    def __init__(self, pub_data):
        port = rospy.get_param('~xbee_port', default='/dev/ttyUSB0')
        baudrate = rospy.get_param('~xbee_baudrate', default=9600)

        self._xbee = XBee(serial.Serial(port, baudrate))
        self._pub_data = pub_data
        self._data_pub = rospy.Publisher(DEFAULT_XBEE_DATA_PUB_TOPIC,
                                         self._pub_data.__class__,
                                         queue_size=1)
        self.is_activated = False

    def activate(self):
        self.is_activated = True

    @abstractmethod
    def _convert_xbee_data(self, binary_data):
        self._pub_data = binary_data

    def publish_data(self):
        if not self.is_activated:
            self.activate()
        self._convert_xbee_data(self._xbee._wait_for_frame())
        # self._convert_xbee_data(self._xbee.wait_read_frame()) is not available for current XBee
        self._data_pub.publish(self._pub_data)
开发者ID:mu-777,项目名称:ros-xbee_test,代码行数:27,代码来源:receive_data_from_xbee.py


示例20: __init__

    def __init__(self, serial_port): 
        self.ser = serial.Serial(serial_port, 57600)
        self.xbee = XBee(self.ser)
        self.rid = zc_id.get_id()
        self.rid = self.rid.split("/",1)[1] 
        self.id = chr(int(self.rid))
        self.xbee.at(frame='A', command='MY', parameter='\x20'+chr(int(self.rid)))
        self.xbee.at(frame='B', command='CH', parameter='\x0e')
        self.xbee.at(frame='C', command='ID', parameter='\x99\x99')
        self.updateTransmitThread = threading.Thread(target=self.transmit_loop)
        self.updateTransmitThread.daemon = True
        self.updateReceiveThread = threading.Thread(target=self.receive_loop)
        self.updateReceiveThread.daemon = True
        self.transmit_peroid = 0.01     # in second

        self.response = 0
        self.rssi = 0
        self.addr = 0
        self.data = 0
        self.sendMessage = ''
        self.transmit = True

        #self.response = self.xbee.wait_read_frame()
        #self.rssi = -ord(self.response.get('rssi'))
        #self.addr = ord(self.response.get('source_addr')[1])
        #self.data = self.response.get('rf_data')

        self.pktNum = 0
        self.sendingCommand = False
        self.ser.flush()
开发者ID:jlamyi,项目名称:zumy,代码行数:30,代码来源:Xbee.py



注:本文中的xbee.XBee类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python xbee.ZigBee类代码示例发布时间:2022-05-26
下一篇:
Python instance.InstanceManager类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap