本文整理汇总了Python中xbee.frame.APIFrame类的典型用法代码示例。如果您正苦于以下问题:Python APIFrame类的具体用法?Python APIFrame怎么用?Python APIFrame使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了APIFrame类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: txXBee
class txXBee(protocol.Protocol, ZigBee):
def __init__(self, escaped=True):
self._escaped = escaped
self._frame = None
def dataReceived(self, data):
for c in data:
if self._frame:
self._frame.fill(c)
if self._frame.remaining_bytes() == 0:
try:
# Try to parse and return result
self._frame.parse()
self.handle_packet(self._split_response(self._frame.data))
except ValueError:
# Bad frame, so restart
self.handle_badframe(self._frame.raw_data)
self._frame = None
else:
if c == APIFrame.START_BYTE:
self._frame = APIFrame(escaped=self._escaped)
self._frame.fill(c)
def handle_packet(self, packet):
pass
def handle_badframe(self, packet):
pass
def _write(self, data):
frame = APIFrame(data, self._escaped).output()
self.transport.write(frame)
开发者ID:mmattice,项目名称:txXBee,代码行数:32,代码来源:protocol.py
示例2: _wait_for_frame
def _wait_for_frame(self, timeout=None):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
frame = APIFrame(escaped=self._escaped)
deadline = 0
if timeout is not None and timeout > 0:
deadline = time.time() + timeout
while True:
if self._callback and not self._thread_continue:
raise ThreadQuitException
if self.serial.inWaiting() == 0:
if deadline and time.time() > deadline:
raise _TimeoutException
time.sleep(.01)
continue
byte = self.serial.read()
if byte != APIFrame.START_BYTE:
continue
# Save all following bytes, if they are not empty
if len(byte) == 1:
frame.fill(byte)
while(frame.remaining_bytes() > 0):
byte = self.serial.read()
if len(byte) == 1:
frame.fill(byte)
try:
# Try to parse and return result
frame.parse()
# Ignore empty frames
if len(frame.data) == 0:
frame = APIFrame()
continue
return frame
except ValueError:
# Bad frame, so restart
frame = APIFrame(escaped=self._escaped)
开发者ID:nioinnovation,项目名称:python-xbee,代码行数:56,代码来源:base.py
示例3: test_invalid_checksum
def test_invalid_checksum(self):
"""
when an invalid frame is read, an exception must be raised
"""
api_frame = APIFrame()
frame = b'\x7E\x00\x01\x00\xF6'
for byte in frame:
api_frame.fill(intToByte(byteToInt(byte)))
self.assertRaises(ValueError, api_frame.parse)
开发者ID:ymichael,项目名称:xbns,代码行数:11,代码来源:test_frame.py
示例4: test_unescape_input
def test_unescape_input(self):
"""
APIFrame must properly unescape escaped input
"""
test_data = b'\x7D\x23'
expected_data = b'\x03'
frame = APIFrame(escaped=True)
for byte in [test_data[x:x+1] for x in range(0, len(test_data))]:
frame.fill(byte)
self.assertEqual(frame.raw_data, expected_data)
开发者ID:ymichael,项目名称:xbns,代码行数:11,代码来源:test_frame.py
示例5: _wait_for_frame
def _wait_for_frame(self, timeout=None):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
frame = APIFrame(escaped=self._escaped)
deadline = 0
if timeout is not None and timeout > 0:
deadline = time.time() + timeout
while True:
if self._exit.is_set():
raise ThreadQuitException
byte = self.serial.read()
if byte != APIFrame.START_BYTE:
if deadline and time.time() > deadline:
raise TimeoutException
continue
if timeout is not None and timeout > 0:
deadline = time.time() + timeout
frame.fill(byte) # Save all following bytes
while frame.remaining_bytes() > 0:
if self._exit.is_set():
raise ThreadQuitException
if self.serial.inWaiting() < 1 and deadline and time.time() > deadline:
raise TimeoutException, len(frame.data)
byte = self.serial.read(frame.remaining_bytes())
for b in byte:
frame.fill(b)
try:
# Try to parse and return result
frame.parse()
return frame
except ValueError:
# Bad frame, so restart
frame = APIFrame(escaped=self._escaped)
开发者ID:hoopes,项目名称:python-xbee,代码行数:52,代码来源:base.py
示例6: rawDataReceived
def rawDataReceived(self, data):
if data[0] == APIFrame.START_BYTE:
self.frame = APIFrame(escaped=self._escaped)
for i in range(0, len(data)):
self.frame.fill(data[i])
if (not (self.frame.remaining_bytes() > 0)):
try:
# Try to parse and return result
self.frame.parse()
return getattr(self, "handle_packet", None)(self._split_response(self.frame.data))
except ValueError:
# Bad frame, so restart
self.frame = APIFrame(escaped=self._escaped)
开发者ID:LukaszSwolkien,项目名称:open-zb-home,代码行数:13,代码来源:protocol.py
示例7: test_single_byte
def test_single_byte(self):
"""
read a frame containing a single byte
"""
api_frame = APIFrame()
frame = b'\x7E\x00\x01\x00\xFF'
expected_data = b'\x00'
for byte in frame:
api_frame.fill(intToByte(byteToInt(byte)))
api_frame.parse()
self.assertEqual(api_frame.data, expected_data)
开发者ID:ymichael,项目名称:xbns,代码行数:14,代码来源:test_frame.py
示例8: _wait_for_frame
def _wait_for_frame(self):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
frame = APIFrame(escaped=self._escaped)
while True:
if self._callback and not self._thread_continue:
raise ThreadQuitException
bytes = self.serial.read(1)
byte = bytes[0]
if byte != APIFrame.START_BYTE:
continue
frame.fill(byte)
while True:
remaining_bytes = frame.remaining_bytes()
if remaining_bytes < 1:
break
bytes = self.serial.read(remaining_bytes)
for byte in bytes:
frame.fill(byte)
try:
# Try to parse and return result
frame.parse()
# Ignore empty frames
if len(frame.data) == 0:
frame = APIFrame()
continue
return frame
except ValueError:
# Bad frame, so restart
frame = APIFrame(escaped=self._escaped)
开发者ID:jleben,项目名称:python-xbee,代码行数:48,代码来源:base.py
示例9: test_escape_method
def test_escape_method(self):
"""
APIFrame.escape() must work as expected
"""
test_data = APIFrame.START_BYTE
new_data = APIFrame.escape(test_data)
self.assertEqual(new_data, APIFrame.ESCAPE_BYTE + b'\x5e')
开发者ID:ymichael,项目名称:xbns,代码行数:7,代码来源:test_frame.py
示例10: test_single_byte
def test_single_byte(self):
"""
read a frame containing a single byte
"""
frame = '\x7E\x00\x01\x00\xFF'
expected_data = '\x00'
data = APIFrame.parse(frame).data
self.assertEqual(data, expected_data)
开发者ID:blalor,项目名称:home-automator,代码行数:9,代码来源:test_frame.py
示例11: _wait_for_frame
def _wait_for_frame(self):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
WAITING = 0
PARSING = 1
data = ''
state = WAITING
while True:
if state == WAITING:
if self._callback and not self._thread_continue:
raise ThreadQuitException
if self.serial.inWaiting() == 0:
time.sleep(.01)
continue
byte = self.serial.read()
# If a start byte is found, swich states
if byte == APIFrame.START_BYTE:
data += byte
state = PARSING
else:
# Save all following bytes
data += self.serial.read()
if len(data) == 3:
# We have the length bytes of the data
# Now, wait for the rest to appear
data_len = struct.unpack("> h", data[1:3])[0]
# Wait for the expected number of bytes to appear
# Grab the checksum too
data += self.serial.read(data_len + 1)
try:
# Try to parse and return result
return APIFrame.parse(data)
except ValueError:
# Bad frame, so restart
data = ''
state = WAITING
开发者ID:blalor,项目名称:home-automator,代码行数:53,代码来源:base.py
示例12: _wait_for_frame
def _wait_for_frame(self, msec):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
frame = APIFrame(escaped=self._escaped)
timeoutflag = False if msec < 0 else True
limit = time.time() + msec / 1000.0
while True:
if timeoutflag and time.time() > limit:
return None
if self._callback and not self._thread_continue:
raise ThreadQuitException
if self.serial.inWaiting() == 0:
time.sleep(.01)
continue
byte = self.serial.read()
if byte != APIFrame.START_BYTE:
continue
# Save all following bytes
frame.fill(byte)
while(frame.remaining_bytes() > 0):
frame.fill(self.serial.read())
try:
# Try to parse and return result
frame.parse()
return frame
except ValueError:
# Bad frame, so restart
frame = APIFrame(escaped=self._escaped)
开发者ID:HuangZhenQiu,项目名称:NanoKong,代码行数:44,代码来源:base.py
示例13: dataReceived
def dataReceived(self, data):
for c in data:
if self._frame:
self._frame.fill(c)
if self._frame.remaining_bytes() == 0:
try:
# Try to parse and return result
self._frame.parse()
self.handle_packet(self._split_response(self._frame.data))
except ValueError:
# Bad frame, so restart
self.handle_badframe(self._frame.raw_data)
self._frame = None
else:
if c == APIFrame.START_BYTE:
self._frame = APIFrame(escaped=self._escaped)
self._frame.fill(c)
开发者ID:mmattice,项目名称:txXBee,代码行数:17,代码来源:protocol.py
示例14: _process_input
def _process_input(self, data, events):
"""
_process_input:
_process_input will be notified when there is data ready on the
serial connection to be read. It will read and process the data
into an API Frame and then either resolve a frame future, or push
the frame into the queue of frames needing to be processed
"""
frame = APIFrame(escaped=self._escaped)
byte = self.serial.read()
if byte != APIFrame.START_BYTE:
return
# Save all following bytes, if they are not empty
if len(byte) == 1:
frame.fill(byte)
while(frame.remaining_bytes() > 0):
byte = self.serial.read()
if len(byte) == 1:
frame.fill(byte)
try:
# Try to parse and return result
frame.parse()
# Ignore empty frames
if len(frame.data) == 0:
return
if self._frame_future is not None:
self._frame_future.set_result(frame)
self._frame_future = None
else:
self._frame_queue.append(frame)
except ValueError:
return
开发者ID:nioinnovation,项目名称:python-xbee,代码行数:41,代码来源:base.py
示例15: _wait_for_frame
def _wait_for_frame(self):
"""
_wait_for_frame: None -> binary data
_wait_for_frame will read from the serial port until a valid
API frame arrives. It will then return the binary data
contained within the frame.
If this method is called as a separate thread
and self.thread_continue is set to False, the thread will
exit by raising a ThreadQuitException.
"""
frame = APIFrame(escaped=self._escaped)
mode = 0
while True:
if self._callback and not self._thread_continue:
raise ThreadQuitException
while ( self.serial.inWaiting() <1):
time.sleep(0.01)
byte = self.serial.read()
if byte =='':
continue
if (mode ==0):
if byte == APIFrame.START_BYTE:
mode=1
else:
continue
frame.fill(byte)
if ( (mode==1) and (frame.remaining_bytes() <=0) ) :
try:
# Try to parse and return result
frame.parse()
mode =0
return frame
except ValueError:
# Bad frame, so restart
mode=0
frame = APIFrame(escaped=self._escaped)
开发者ID:AhmedAnsariIIT,项目名称:iitmabhiyanros,代码行数:42,代码来源:base.py
示例16: XBee
ser = serial.Serial(PORT, BAUD_RATE)
# Create API object
xbee = XBee(ser, escaped=True)
# DEST_ADDR_LONG = "\x00\x13\xA2\x00\x40\x8B\x35\xAE"
# data = "7E 00 7D 33 10 01 00 00 00 00 00 00 00 00 FF FE 00 00 61 61 61 61 61 0C"
HADER = '\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFE\x00\x00'
data = '\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFE\x00\x00\x61\x61\x61\x61\x61\x61\x61'
if __name__ == "__main__":
while True:
try:
frame = APIFrame(data, True)
send_data = ''.join(['%x' % ord(s) for s in frame.output()])
s2 = binascii.b2a_hex(frame.output())
print s2
ser.write(frame.output())
# print frame
# print "send data"
# xbee.tx_long_addr(frame='10', dest_addr=DEST_ADDR_LONG, data='A')
# print xbee.api_commands.frame
# xbee.Send(bytearray.fromhex("7E 00 14 10 01 00 00 00 00 00 00 00 00 FF FE 00 00 61 61 61 61 61 61 AB"))
# ser.write("\x7E\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFE\x00\x00\x61\x61\x61\x61\x61\x61\xAB")
# ch = checksum(data)
# print '%x ' % ord(ch)
开发者ID:kamedono,项目名称:raspi-camera-research,代码行数:30,代码来源:serial_test.py
示例17: test_remaining_bytes
def test_remaining_bytes(self):
"""
remaining_bytes() should provide accurate indication
of remaining bytes required before parsing a packet
"""
api_frame = APIFrame()
frame = b'\x7E\x00\x04\x00\x00\x00\x00\xFF'
self.assertEqual(api_frame.remaining_bytes(), 3)
api_frame.fill(frame[0])
self.assertEqual(api_frame.remaining_bytes(), 2)
api_frame.fill(frame[1])
self.assertEqual(api_frame.remaining_bytes(), 1)
api_frame.fill(frame[2])
self.assertEqual(api_frame.remaining_bytes(), 5)
api_frame.fill(frame[3])
self.assertEqual(api_frame.remaining_bytes(), 4)
开发者ID:ymichael,项目名称:xbns,代码行数:17,代码来源:test_frame.py
示例18: APIFrame
# 480以上の送信は止まるので一旦クローズ
if send_count % 470 == 0:
ser.close()
ser = serial.Serial(PORT, BAUD_RATE)
# print "stop"
# send_count = 0
# データを250byteのみ読み込む
data = f.read(250)
# ヘッダーをデータの頭に追加
data = "".join([HADER, data])
# xbeeのaipフレームに合わせる
frame = APIFrame(data, True)
send_data = ''.join(['%x' % ord(s) for s in frame.output()])
# 終了かどうか
if frame.output() == END_POINT:
finish_count += 1
print "end point%d" % send_count
f.close()
f = open(FILENAME, "rb")
if finish_count is 5:
print "finish"
break
# データを転送
ser.write(frame.output())
开发者ID:kamedono,项目名称:raspi-camera-research,代码行数:30,代码来源:serial_send_image.py
注:本文中的xbee.frame.APIFrame类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论