本文整理汇总了Python中serial.write函数的典型用法代码示例。如果您正苦于以下问题:Python write函数的具体用法?Python write怎么用?Python write使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了write函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: measurePointHeight
def measurePointHeight(x, y, initialHeight, feedrate):
macro("M402","ok",2,"Retracting Probe (safety)",1, verbose=False)
macro("G0 Z60 F5000","ok",5,"Moving to start Z height",10) #mandatory!
macro("G0 X"+str(x)+" Y"+str(y)+" Z "+str(initialHeight)+" F10000","ok",2,"Moving to left down corner point",10)
macro("M401","ok",2,"Lowering Probe",1, warning=True, verbose=False)
serial.flushInput()
serial_reply = ""
probe_start_time=time.time()
serial.write("G30 U"+str(feedrate)+"\r\n")
while not serial_reply[:22]=="echo:endstops hit: Z:":
serial_reply=serial.readline().rstrip()
if (time.time() - probe_start_time>80): #timeout management
z = initialHeight
serial.flushInput()
return z
pass
if serial_reply!="":
z=serial_reply.split("Z:")[1].strip()
serial_reply=""
serial.flushInput()
macro("G0 X"+str(x)+" Y"+str(y)+" Z "+str(initialHeight)+" F10000","ok",2,"Moving to left down corner point",10)
macro("M402","ok",2,"Raising Probe",1, warning=True, verbose=False)
return z
开发者ID:FAB-UI-plugins,项目名称:advancedbedcalibration,代码行数:26,代码来源:manualCalibration.py
示例2: safety_callback
def safety_callback(channel):
try:
code=""
type=""
if(GPIO.input(2) == GPIO.LOW):
#todo
type="emergency"
serial.flushInput()
serial.write("M730\r\n")
reply=serial.readline()
try:
code=float(reply.split("ERROR : ")[1].rstrip())
except:
code=100
if(GPIO.input(2) == GPIO.HIGH):
#to do
type=""
code=""
message = {'type': str(type), 'code': str(code)}
ws.send(json.dumps(message))
write_emergency(json.dumps(message))
except Exception, e:
logging.info(str(e))
开发者ID:cecilden,项目名称:FAB-UI,代码行数:30,代码来源:monitor.py
示例3: probe
def probe(x,y):
global points_on_plane
serial_reply=""
serial.flushInput()
serial.write("G30\r\n")
probe_start_time = time.time()
while not serial_reply[:22]=="echo:endstops hit: Z:":
serial_reply=serial.readline().rstrip()
#issue G30 Xnn Ynn and waits reply.
if (time.time() - probe_start_time>90):
#timeout management
trace("Could not probe this point")
return False
break
pass
#get the z position
z=float(serial_reply.split("Z:")[1].strip())
new_point = [x,y,z,1]
points_on_plane = np.vstack([points_on_plane, new_point]) #append new point to the cloud.
trace("Probed "+str(x)+ "," +str(y) + " / " + str(deg) + " degrees = " + str(z))
return True
开发者ID:AungWinnHtut,项目名称:FAB-UI,代码行数:26,代码来源:p_scan.py
示例4: dlt645_read_time
def dlt645_read_time(serial,addr,data_tag):
#print 'dlt645_read_time ...'
try:
cmd2 = '\xfe\xfe\xfe\xfe' + encode_dlt645(addr,0x11,4,data_tag)
serial.write(cmd2)
time.sleep(0.5)
resp = ''
c = ''
i = 0
while c != '\x16' and i < SERIAL_TIMEOUT_CNT:
c = serial.read(1)
if len(c) > 0:
resp += c
else:
print '.'
i += 1
if i >= SERIAL_TIMEOUT_CNT:
return -1,0
resp1 = dlt_645_rm_fe(resp)
ret,addr,data,ctl = decode_dlt645(resp1)
#print data.encode('hex')
# need to convert to Unix time stamp
if ret == 0 and len(data) >= 7:
list1 = list(bcd2digits(data[4:7]))
str1 = ''.join(str(e) for e in list1)
return ret,str1
else:
return -1,0
except:
print 'dlt645_read_data exception!'
return -1,0
开发者ID:szdiy,项目名称:duo,代码行数:33,代码来源:read_dlt645.py
示例5: target_flash_start
def target_flash_start():
file=open("../count/count.txt",'r+')
count_ok=file.readline()
count_ng=file.readline()
count_ok=int(count_ok)
count_ng=int(count_ng)
file.close()
sleep(1)
child1=subprocess.Popen(["../tool/JLink.exe","-CommanderScript","flash_nrf.jlink"],stdout=subprocess.PIPE)
out=child1.communicate()
# print out[0]
print("**************************")
print("")
if "Writing target memory failed" in out[0]:
print " flash error "
serial.write('Error\r\n')
count_ng=count_ng+1
elif "O.K." in out[0]:
print(" flash ok ")
serial.write('OK\r\n')
count_ok=count_ok+1
else:
print"Didn't flash any chip"
WriteCount(count_ok,count_ng)
print("%d/%d"%(count_ok,count_ng))
print("**************************")
target_flash_start_event.set()
开发者ID:tangyunzhu,项目名称:Rephone_Firmware_update_Jlink,代码行数:29,代码来源:Program.py
示例6: probe
def probe(x,y):
global points_on_plane
serial_reply=""
serial.flushInput() #clean buffer
probe_point= "G30\r\n" #probing comand
serial.write(probe_point)
time.sleep(0.5) #give it some to to start
probe_start_time = time.time()
while not serial_reply[:22]=="echo:endstops hit: Z:":
#issue G30 Xnn Ynn and waits reply.
#Expected reply
#endstops hit: Z: 0.0000
#couldn't probe point: =
if (time.time() - probe_start_time>10):
#10 seconds passed, no contact
trace_msg="Could not probe "+str(x)+ "," +str(y) + " / " + str(deg) + " degrees"
trace(trace_msg)
#change safe-Z
return False #leave the function,keep probing
serial_reply=serial.readline().rstrip()
#add safety timeout, exception management
time.sleep(0.1) #wait
pass
#get the z position
#print serial_reply
z=float(serial_reply.split("Z:")[1].strip())
new_point = [x,y,z,1]
points_on_plane = np.vstack([points_on_plane, new_point]) #append new point to the cloud.
trace("Probed "+str(x)+ "," +str(y) + " / " + str(deg) + " degrees = " + str(z))
return True
开发者ID:HrRossi,项目名称:FAB-UI,代码行数:35,代码来源:p_scan.py
示例7: read_speed
def read_speed(motor_num):
if (motor_num == 1):
board_num = 0x80
command = 30
serial = ser_x80
elif (motor_num == 2):
board_num = 0x81
command = 30
serial = ser_x81
elif (motor_num == 3):
board_num = 0x80
command = 31 ## OK, this is seriously a major WTF here. I need to see if the board is wired wrong or something.
serial = ser_x80
data = []
serial.write(chr(board_num))
serial.write(chr(command))
for i in range(6):
data.append(serial.read())
speed = (data[0].encode("hex")) + (data[1].encode("hex")) + (data[2].encode("hex")) + (data[3].encode("hex"))
speed = int(speed, 16)
if ((ord(data[4]) == 1) and (speed != 0)):
speed = ~(0xffffffff - speed) + 1
rotations_per_second = float(speed) * 125 / 8192
return rotations_per_second
开发者ID:speedyswimmer1000,项目名称:roboSoccer,代码行数:26,代码来源:s.py
示例8: serWrite
def serWrite(threadName, delay):
if len(serialq) > 0:
msg = serialq.popleft()
serial.write(msg)
print ("[Arduino | %s] serWrite > %s" %(time.ctime(time.time()), msg))
real_delay_call(serWrite, delay, threadName, delay)
开发者ID:kenrick95,项目名称:mdp-algo,代码行数:7,代码来源:server_pi.py
示例9: write_multiple_registers
def write_multiple_registers(serial, data, Params):
start_reg = data[3] #inicio de cadena
if start_reg == '\x00' : # Pesos
Params.in_00 = (ord(data[7]) << 8) + ord(data[8])
print 'Recibido: $ %d.%d ' % ( Params.in_00/100, Params.in_00%100), repr(data[:])
Params.in_01 = ( Params.in_00*100 / Params.in_02*100 )/100 # Multiplica litros por precio
elif start_reg == '\x01' : # Litros
Params.in_01 = (ord(data[7]) << 8) + ord(data[8])
print 'Recibido: %d.%d Litro(s)' % ( Params.in_01/100, Params.in_01%100), repr(data[:])
Params.in_00 = ( Params.in_01 * (Params.in_02) )/100 # Multiplica litros por precio
elif start_reg == '\x02' : # Magna
Params.in_02 = (ord(data[7]) << 8) + ord(data[8])
Params.in_00 = ( Params.in_01 * (Params.in_02) )/100 # Multiplica litros por precio
print 'Recibido: $%d.%02d Magna' % ( Params.in_02/100, Params.in_02%100), repr(data[:])
elif start_reg == '\x03' : # Premium
Params.in_03 = (ord(data[7]) << 8) + ord(data[8])
#Params.in_00 = ( Params.in_01 * (Params.in_02) )/1000 # Multiplica litros por precio
print 'Recibido: $%d.%02d Premium' % ( Params.in_03/100, Params.in_03%100), repr(data[:])
elif start_reg == '\x1d' : # cadena de datos (30 caracteres)
Params.str_00 = convert_st(data[7:37])
print 'Cadena 0: ', Params.str_00 , repr(data[:])
elif start_reg == '\x3b' : # cadena de datos (30 caracteres)
Params.str_01 = convert_st(data[7:37])
print 'Cadena 1: ', Params.str_01 , repr(data[:])
else:
print 'Recibido ', repr(data[:])
SRT = "\x01\x10\x00\x00\x00\x04"
SRT= add_chksum (SRT)
serial.write(SRT)
开发者ID:cristian69,项目名称:KernotekV3,代码行数:30,代码来源:modbus.py
示例10: macro
def macro(code,expected_reply,timeout,error_msg,delay_after):
global s_error
serial_reply=""
macro_start_time = time.time()
serial.write(code+"\r\n")
time.sleep(0.3) #give it some toie to start
while not (serial_reply==expected_reply or serial_reply[:4]==expected_reply):
#Expected reply
#no reply:
if (time.time()>=macro_start_time+timeout):
#trace_msg="failed macro (timeout):"+ code+ " expected "+ expected_reply+ ", got:"+ serial_reply
trace(trace_msg,log_trace)
if not error_msg=="":
trace("FAILED : "+error_msg,log_trace)
s_error+=1
summary()
#print "error!"
return False #leave the function
serial_reply=serial.readline().rstrip()
#print serial_reply
#add safety timeout
time.sleep(0.1) #no hammering
pass
trace("OK : " + error_msg,log_trace)
time.sleep(delay_after) #wait the desired amount
return serial_reply
开发者ID:Fensterbank,项目名称:FAB-UI,代码行数:28,代码来源:self_test.py
示例11: _send_and_wait_for_ack
def _send_and_wait_for_ack(self, packet, serial):
ack = 0
MAX_RETRIES = 1
while ack != 0x06 and int(chr(packet[4])) < MAX_RETRIES:
serial.write(packet)
ack = serial.read(1)
# This violates the principle that we do high level
# client-side and low level posbox-side but the retry
# counter is always in a fixed position in the high level
# message so it's safe to do it. Also it would be a pain
# to have to throw this all the way back to js just so it
# can increment the retry counter and then try again.
packet = packet[:4] + str(int(packet[4]) + 1) + packet[5:]
if ack:
ack = ord(ack)
else:
_logger.warning("did not get ACK, retrying...")
ack = 0
if ack == 0x06:
return True
else:
_logger.error("retried " + str(MAX_RETRIES) + " times without receiving ACK, is blackbox properly connected?")
return False
开发者ID:chaudhary4k4,项目名称:odoo9,代码行数:27,代码来源:main.py
示例12: summary
def summary():
trace("SUMMARY:",log_trace)
trace("Errors : " + str(s_error),log_trace)
if s_error==0:
trace("Result : No errors occurred.",log_trace)
else:
trace("Self Test FAILED",log_trace)
if online==1:
print "Saving results on remote server"
url = 'http://plm.fabtotum.com/reports/add_report.php'
files = {'file': open(log_trace, 'rb')}
info={'ID':controller_serial_id, 'result':s_error}
try:
r = requests.post(url,info, files=files)
trace("Self Test results saved online",log_trace)
except:
#print "Response: " + r.text
trace("Could not contact remote support server, is Internet connectivity available?",log_trace)
if s_error>0:
#clean the buffer and leave
#shutdown temps
serial.write("M104 S0\r\n") #shutdown extruder (fast)
serial.write("M140 S0\r\n") #shudown bed (fast)
serial.flush()
serial.close()
write_json('1', status_json)
call (['sudo php /var/www/fabui/script/finalize.php '+str(task_id)+" self_test"], shell=True)
sys.exit()
开发者ID:Fensterbank,项目名称:FAB-UI,代码行数:33,代码来源:self_test.py
示例13: dlt645_get_addr
def dlt645_get_addr(serial):
#print 'dlt645_get_addr ...'
try:
#cmd2 = '\xfe\xfe\xfe\xfe\x68\xaa\xaa\xaa\xaa\xaa\xaa\x68\x13\x00\xdf\x16'
cmd2 = '\xfe\xfe\xfe\xfe' + encode_dlt645('\xaa\xaa\xaa\xaa\xaa\xaa',0x13,0,'')
serial.write(cmd2)
time.sleep(0.5)
resp = ''
c = ''
i = 0
while c != '\x16' and i < SERIAL_TIMEOUT_CNT:
c = serial.read(1)
if len(c) > 0:
resp += c
else:
print '.'
i += 1
if i >= SERIAL_TIMEOUT_CNT:
return -1,0
resp1 = dlt_645_rm_fe(resp)
#print 'resp1:',resp1
ret,addr,data,ctl = decode_dlt645(resp1)
if ret == 0:
return ret,addr
except:
print 'dlt645_get_addr exception!'
return -1,''
开发者ID:aixiwang,项目名称:python_dlt645,代码行数:34,代码来源:read_dlt645.py
示例14: change_speed
def change_speed(motor_num, speed):
speed = int(speed)
if (motor_num == 1):
board_num = 0x80
command = 35
serial = ser_x80
elif (motor_num == 2):
board_num = 0x81
command = 36
serial = ser_x81
elif (motor_num == 3):
board_num = 0x80
command = 36
serial = ser_x80
else:
print "Not a possibility."
speed_byte3 = speed & 0xff #Least significant bit
speed = speed >> 8
speed_byte2 = speed & 0xff
speed = speed >> 8
speed_byte1 = speed & 0xff
speed = speed >> 8
speed_byte0 = speed & 0xff # Most significant bit
checksum = (board_num + command + speed_byte0 + speed_byte1 + speed_byte2 + speed_byte3) & 0x7f
cmdList = [board_num, command, speed_byte0, speed_byte1, speed_byte2, speed_byte3, checksum]
for i in range(len(cmdList)):
serial.write(chr(cmdList[i]))
# print cmdList[i]
return 0;
开发者ID:speedyswimmer1000,项目名称:roboSoccer,代码行数:31,代码来源:s.py
示例15: faceDetection
def faceDetection () :
camera.capture( imagePath )
# Read the image
image = cv2.imread( imagePath )
gray = cv2.cvtColor( image, cv2.COLOR_BGR2GRAY )
# Detect faces in the image
faces = faceCascade.detectMultiScale (
gray,
scaleFactor = 1.1,
minNeighbors = 5,
minSize = ( 30, 30 ),
flags = cv2.cv.CV_HAAR_SCALE_IMAGE
)
for ( x, y, w, h ) in faces:
centerX = x + ( w / 2 )
centerY = y + ( h / 2 )
print "Found face at: {0}, {0}".format( centerX, centerY )
serial.write( "{0}, {0}".format( centerX, centerY ) );
cv2.rectangle( image, ( x, y ), ( x + w, y + h ), ( 0, 255, 0), 3 )
cv2.imwrite( imageFaceDetectionPath, image )
break
# recall fn
interval.enter( refreshTime, 1, faceDetection, () )
开发者ID:Coderwelsch,项目名称:IO-Motor-Poster,代码行数:30,代码来源:face-detection.py
示例16: sendReceiveScript
def sendReceiveScript(serial, filename='init.lua'):
receiveLua = \
r"""tmr.stop(0)
file.remove('%s')
file.open('%s', 'w')
uart.on('data', 255,
function (d)
tmr.stop(0)
c = tonumber(d:sub(1, 4))
d = d:sub(5, 4+c)
file.write(d)
if c ~= 251 then
file.close()
uart.write(0, 'Y')
return
end
uart.write(0, 'Y')
end, 0)"""
receiveLua = ' '.join([line.strip() for line in receiveLua.split('\n')]).replace(', ', ',')
save_command = receiveLua % (filename, filename) + '\r'
assert len(save_command) < 256, 'Save_command too long: %s bytes' % len(save_command)
writeV('Sending receive script to NodeMCU...')
serial.write(save_command.encode())
response = waitFor(prompt)
#assert response == save_command + prompt, response
writeLnV('ok!')
开发者ID:bepursuant,项目名称:SublimeNodeMCU,代码行数:29,代码来源:SublimeNodeMCU.py
示例17: send_message
def send_message(serial, msg_type, msg):
print "Sending, id=0x%02X, len=%d" % (msg_type, len(msg))
serial.write(chr(DEBUG_MAGIC_1))
serial.write(chr(DEBUG_MAGIC_2))
serial.write(chr(msg_type))
serial.write(chr(len(msg)))
serial.write(msg)
开发者ID:peddie,项目名称:Swift-Nav-Code,代码行数:7,代码来源:spectrum.py
示例18: macro
def macro(code,expected_reply,timeout,error_msg,delay_after,warning=False,verbose=True):
global s_error
global s_warning
global s_skipped
serial.flushInput()
if s_error==0:
serial_reply=""
macro_start_time = time.time()
serial.write(code+"\r\n")
if verbose:
trace(error_msg)
time.sleep(0.3) #give it some tome to start
while not (serial_reply==expected_reply or serial_reply[:4]==expected_reply):
#Expected reply
#no reply:
if (time.time()>=macro_start_time+timeout+5):
if serial_reply=="":
serial_reply="<nothing>"
if not warning:
s_error+=1
trace(error_msg + ": Failed (" +serial_reply +")")
else:
s_warning+=1
trace(error_msg + ": Warning! ")
return False #leave the function
serial_reply=serial.readline().rstrip()
#add safety timeout
time.sleep(0.2) #no hammering
pass
time.sleep(delay_after) #wait the desired amount
else:
trace(error_msg + ": Skipped")
s_skipped+=1
return False
return serial_reply
开发者ID:AungWinnHtut,项目名称:colibri-fabui,代码行数:35,代码来源:manual_bed_lev.py
示例19: _worksession_start
def _worksession_start(self, seconds):
serial = self._openSerial()
minutes = int(seconds) / 60
data = chr(minutes)
serial.write(data)
self._closeSerial(serial)
开发者ID:glibersat,项目名称:pomarduino,代码行数:8,代码来源:__init__.py
示例20: query_arduino
def query_arduino():
global serial
serial = serial.Serial('/dev/ttyACM0', 9600)
serial.write('1')
query = serial.readline().strip('\r\n').split()
fo = open('/etc/scripts/.arduino.db', 'wb')
fo.write(','.join(query))
fo.close()
开发者ID:deflax,项目名称:sysadmin-howtos,代码行数:8,代码来源:arduino.py
注:本文中的serial.write函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论