本文整理汇总了Python中repeated_timer.RepeatedTimer类的典型用法代码示例。如果您正苦于以下问题:Python RepeatedTimer类的具体用法?Python RepeatedTimer怎么用?Python RepeatedTimer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RepeatedTimer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: DashieSampler
class DashieSampler(object):
def __init__(self, app, interval):
self._app = app
self._timer = RepeatedTimer(interval, self._sample)
def stop(self):
self._timer.stop()
def name(self):
'''
Child class implements this function
'''
return 'UnknownSampler'
def sample(self):
'''
Child class implements this function
'''
return {}
def _send_event(self, widget_id, body):
body['id'] = widget_id
body['updateAt'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S +0000')
formatted_json = 'data: %s\n\n' % (json.dumps(body))
self._app.last_events[widget_id] = formatted_json
for event_queue in self._app.events_queue.values():
event_queue.put(formatted_json)
def _sample(self):
data = self.sample()
if data:
self._send_event(self.name(), data)
开发者ID:pmarti,项目名称:pydashie,代码行数:32,代码来源:dashie_sampler.py
示例2: __init__
class DatDashSampler:
def __init__(self, app, interval):
self._app = app
self._timer = RepeatedTimer(interval, self._sample)
def stop(self):
self._timer.stop()
def name(self):
"""
Child class implements this function
"""
return "UnknownSampler"
def sample(self):
"""
Child class implements this function
"""
return {}
def _send_event(self, widget_id, body):
body["id"] = widget_id
body["updateAt"] = \
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S +0000")
formatted_json = "data: %s\n\n" % (json.dumps(body))
self._app.last_events[widget_id] = formatted_json
for event_queue in self._app.events_queue.values():
event_queue.put(formatted_json)
def _sample(self):
data = self.sample()
if data:
self._send_event(self.name(), data)
开发者ID:LuRsT,项目名称:datdash,代码行数:35,代码来源:datdash_sampler.py
示例3: __init__
class MultiTouchInput:
current_state = 0
current_state_counter = [0]*12
touch_timer = None
def __init__(self, ipcon, key_queue):
if not config.UID_MULTI_TOUCH_BRICKLET:
print("Not Configured: Multi Touch")
return
self.key_queue = key_queue
self.ipcon = ipcon
self.mt = MultiTouch(config.UID_MULTI_TOUCH_BRICKLET, self.ipcon)
try:
self.mt.get_electrode_sensitivity()
print("Found: Multi Touch ({0})").format(config.UID_MULTI_TOUCH_BRICKLET)
except:
print("Not Found: Multi Touch ({0})").format(config.UID_MULTI_TOUCH_BRICKLET)
return
self.mt.set_electrode_sensitivity(100)
self.mt.register_callback(self.mt.CALLBACK_TOUCH_STATE, self.cb_touch_state)
self.touch_timer = RepeatedTimer(0.1, self.touch_tick)
def stop(self):
if self.touch_timer is not None:
self.touch_timer.stop()
def state_to_queue(self, state):
for item in config.KEYMAP_MULTI_TOUCH.items():
if state & (1 << item[0]):
self.key_queue.put(item[1])
def touch_tick(self):
state = 0
for i in range(12):
if self.current_state & (1 << i):
self.current_state_counter[i] += 1
else:
self.current_state_counter[i] = 0
if self.current_state_counter[i] > 5:
state |= (1 << i)
if state != 0:
self.state_to_queue(state)
def cb_touch_state(self, state):
changed_state = self.current_state ^ state
self.current_state = state
self.state_to_queue(changed_state & self.current_state)
开发者ID:chuangke365,项目名称:blinkenlights,代码行数:54,代码来源:keypress.py
示例4: on_complete
def on_complete(self):
self.animation_loader.on_complete()
self.interval_animation.stop()
if os.path.isfile(self.NODE_JS_BINARY_TARFILE_FULL_PATH) :
os.remove(self.NODE_JS_BINARY_TARFILE_FULL_PATH)
node_js = NodeJS()
npm = NPM()
self.animation_loader = AnimationLoader(["[= ]", "[ = ]", "[ = ]", "[ = ]", "[ =]", "[ = ]", "[ = ]", "[ = ]"], 0.067, "Installing npm dependencies ")
self.interval_animation = RepeatedTimer(self.animation_loader.sec, self.animation_loader.animate)
try :
npm.getCurrentNPMVersion(True)
except Exception as e:
if node_variables.NODE_JS_OS == "win" :
sublime.error_message("Can't use \"npm\"! To use features that requires \"npm\", you must install it! Download it from https://nodejs.org site")
print("Error: "+traceback.format_exc())
try :
npm.install_all()
except Exception as e :
#print("Error: "+traceback.format_exc())
pass
self.animation_loader.on_complete()
self.interval_animation.stop()
if node_js.getCurrentNodeJSVersion(True) == self.NODE_JS_VERSION :
sublime.active_window().status_message("Node.js "+self.NODE_JS_VERSION+" installed correctly! NPM version: "+npm.getCurrentNPMVersion(True))
else :
sublime.active_window().status_message("Can't install Node.js! Something went wrong during installation.")
开发者ID:phillipoertel,项目名称:st3_config,代码行数:26,代码来源:installer.py
示例5: __init__
def __init__(self, *args, **kwargs):
tk.Tk.__init__(self, *args, **kwargs)
tk.Tk.title(self, 'OpenSlides')
tk.Tk.resizable(self, width=False, height=False)
self.protocol('WM_DELETE_WINDOW', self.on_close_app)
self.root = tk.Frame(self, bg=self.cget('bg'))
self.root.pack(side="top", fill="both", expand = True)
self.btn_settings = None
self.btn_db_backup = None
self.btn_sync_db = None
self.btn_reset_admin = None
self.chb_auto_svr = None
self.btn_server = None
self.backup_timer = RepeatedTimer(self.on_backup_timer)
self.lbl_host = None
self.lbl_port = None
if not self.initialize_gui():
self.quit()
self.createMenuBar()
self.createMainframe()
开发者ID:frauenknecht,项目名称:openslides-gui-mac,代码行数:27,代码来源:gui.py
示例6: updateNPMDependencies
def updateNPMDependencies():
npm = NPM()
try :
npm.getCurrentNPMVersion(True)
except Exception as e:
print("Error: "+traceback.format_exc())
return
animation_loader = AnimationLoader(["[= ]", "[ = ]", "[ = ]", "[ = ]", "[ =]", "[ = ]", "[ = ]", "[ = ]"], 0.067, "Updating npm dependencies ")
interval_animation = RepeatedTimer(animation_loader.sec, animation_loader.animate)
try :
npm.update_all(False)
except Exception as e:
pass
animation_loader.on_complete()
interval_animation.stop()
开发者ID:jordanwade,项目名称:dotfiles,代码行数:16,代码来源:installer.py
示例7: run_game_loop
def run_game_loop(self):
self.frame_rendered(0)
self.timer = RepeatedTimer(0.1, self.tick)
while self.loop:
key = self.kp.read_single_keypress().lower()
if key == 'a':
self.move_paddle(0, -1)
elif key == 's':
self.move_paddle(0, 1)
elif key == 'k':
self.move_paddle(1, -1)
elif key == 'l':
self.move_paddle(1, 1)
elif key == 'r':
self.init_game()
elif not config.HAS_GUI and key == 'q':
break
if not config.IS_LED_STRIP_V2:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_RENDERED, None)
else:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_STARTED, None)
self.timer.stop()
self.kp.stop()
开发者ID:Tinkerforge,项目名称:blinkenlights,代码行数:28,代码来源:pong.py
示例8: run_game_loop
def run_game_loop(self):
self.frame_rendered(0)
self.drop_timer = RepeatedTimer(1.0, self.drop_tetromino)
while self.loop:
key = self.kp.read_single_keypress()
if key == 'a':
self.speaker.beep_input()
self.move_tetromino(0, -1, self.tetromino_form)
elif key == 'd':
self.speaker.beep_input()
self.move_tetromino(0, 1, self.tetromino_form)
elif key == 's':
self.speaker.beep_input()
self.move_tetromino(1, 0, self.tetromino_form)
elif key == 'k':
self.speaker.beep_input()
self.move_tetromino(0, 0, (self.tetromino_form-1) % 4)
elif key == 'l':
self.speaker.beep_input()
self.move_tetromino(0, 0, (self.tetromino_form+1) % 4)
elif key == 'r':
self.init_game()
elif not config.HAS_GUI and key == 'q':
break
if not config.IS_LED_STRIP_V2:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_RENDERED, None)
else:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_STARTED, None)
self.drop_timer.stop()
self.kp.stop()
开发者ID:Tinkerforge,项目名称:blinkenlights,代码行数:35,代码来源:tetris.py
示例9: updateNPMDependencies
def updateNPMDependencies():
npm = NPM()
try :
npm.getCurrentNPMVersion(True)
except Exception as e:
if node_variables.NODE_JS_OS == "win" :
sublime.active_window().status_message("Can't use \"npm\"! To use features that requires \"npm\", you must install it! Download it from https://nodejs.org site")
print("Error: "+traceback.format_exc())
return
animation_loader = AnimationLoader(["[= ]", "[ = ]", "[ = ]", "[ = ]", "[ =]", "[ = ]", "[ = ]", "[ = ]"], 0.067, "Updating npm dependencies ")
interval_animation = RepeatedTimer(animation_loader.sec, animation_loader.animate)
try :
npm.update_all(False)
except Exception as e:
pass
animation_loader.on_complete()
interval_animation.stop()
开发者ID:phillipoertel,项目名称:st3_config,代码行数:18,代码来源:installer.py
示例10: __init__
def __init__(self, msg_func, master):
self.msg_func = msg_func
self.master = master
self.child_process = None
self.output_queue = queue.Queue()
self.output_read_thread = None
self.output_mutex = threading.RLock()
self.update_timer = RepeatedTimer(self.on_update_timer)
self._exitcode = None
开发者ID:frauenknecht,项目名称:openslides-gui-mac,代码行数:9,代码来源:command_runner.py
示例11: start
def start(self, info):
logging.info('Master Starts')
self.last_wake_time = datetime.datetime.utcnow()
self.is_started = True
self.pop_forever_handler = threading.Thread(target=self.start_popping_tasks)
self.pop_forever_handler.start()
self.repeated_timer = RepeatedTimer(self.clean_time_gap, self.notice_refresh, None)
开发者ID:habemusne,项目名称:flexibloader,代码行数:9,代码来源:master.py
示例12: __init__
def __init__(self):
# GPIO番号でピンを指定
# ServoBlasterの起動(rootで実行する必要あり)
# 50%指定時の中間位置を--maxで調整する。--max=200がちょうどよかった
os.system('sudo /home/pi/PiBits/ServoBlaster/user/servod --idle-timeout=2000 --max=200')
self.servos = []
for index in RPiServoblasterController.PIN_MAP.iterkeys():
pin = RPiServoblasterController.PIN_MAP[index]
# サーボを作る
self.servos.append(SG90Servoblaster(pin, self.getPartName(index)))
# self.positionの内容を定期的にcommit()を使ってservoblasterで反映する
self.positions = []
self.timer = RepeatedTimer(RPiServoblasterController.COMMIT_INTERVAL_SEC, self.commit)
self.timer.start()
开发者ID:maimuzo,项目名称:mearm-ps3controller-for-raspberrypi,代码行数:16,代码来源:rpi_servoblaster_controller.py
示例13: __init__
def __init__(self, address=0x40):
# I2C経由でのコントロール用ライブラリ
# https://github.com/adafruit/Adafruit-Raspberry-Pi-Python-Code.git
self.pwm = Adafruit_PCA9685.PCA9685(address, busnum=1)
self.pwm.set_pwm_freq(SG90PCA9685.PWM_FREQ)
self.servos = []
for index in RPiPCA9685Controller.PIN_MAP.iterkeys():
pin = RPiPCA9685Controller.PIN_MAP[index]
# サーボを作る
self.servos.append(SG90PCA9685(pin, self.getPartName(index), self.pwm))
# self.positionの内容を定期的にcommit()を使ってservoblasterで反映する
self.positions = []
self.timer = RepeatedTimer(RPiPCA9685Controller.COMMIT_INTERVAL_SEC, self.commit)
self.timer.start()
开发者ID:maimuzo,项目名称:mearm-ps3controller-for-raspberrypi,代码行数:17,代码来源:rpi_pca9685_controller.py
示例14: start_timer
def start_timer(self):
self.threading_timer = RepeatedTimer(3, self.refresh)
self.threading_timer.start()
开发者ID:jordangriffiths01,项目名称:betting-display,代码行数:3,代码来源:betting_display.py
示例15: BettingDisplay
class BettingDisplay():
def __init__(self, parent, meeting):
self.parent = parent
self.display_type = 'odds'
self.date = datetime.date.today().strftime("%Y-%m-%d") #Currently will need to be restarted each day
self.meeting = meeting #PLACEHOLDER
self.schedule = Schedule(self.date, self.meeting)
self.races = self.schedule.meetings[0].races
self.next_race = None
self.set_next_race()
self.odds_var = StringVar()
self.title_var = StringVar()
self.dets_var = StringVar()
self.build_display()
self.start_timer()
print("STARTED SUCCESSFULLY")
def set_next_race(self):
next_race = None
found = False
for race in self.races:
cur_time = int(datetime.datetime.now().strftime("%H%M%S"))
race_time = int(re.sub('[:]', '', race.time))
if race_time > cur_time and not found:
next_race = race
found = True
if next_race is not self.next_race:
self.next_race = next_race
def start_timer(self):
self.threading_timer = RepeatedTimer(3, self.refresh)
self.threading_timer.start()
def refresh(self):
if self.display_type == 'odds':
self.set_next_race()
self.next_race.load_odds()
#---TEMP---#
horse_nums = ['']*20
for i in range(min(20, len(self.next_race.entries))):
horse_nums[i] = str(self.next_race.entries[i].number)
horse_names = ['']*20
for i in range(min(20, len(self.next_race.entries))):
horse_names[i] = str(self.next_race.entries[i].name)
win_odds = ['']*20
for i in range(min(20, len(self.next_race.entries))):
win_odds[i] = str(self.next_race.entries[i].odds_win)
lst = horse_nums + horse_names + win_odds
odds_str = TEST_TEMPLATE.format(lst)
title_str = TITLE_TEMPLATE.format(name=self.next_race.name)
dets_str = TIME_TEMPLATE.format(time=self.next_race.time, venue=self.next_race.meeting.venue, meet_no=self.next_race.meeting.number, country=self.next_race.meeting.country)
self.title_var.set(title_str)
self.dets_var.set(dets_str)
self.odds_var.set(odds_str)
#---TEMP END---#
elif self.display_type == 'test':
self.title_var.set('test')
self.dets_var.set('')
self.odds_var.set('')
def build_display(self):
#----TEMP----
self.cur_race_name = StringVar()
self.cur_race_time = StringVar()
self.title_text = Label(self.parent, fg="white", bg="black", font=("Courier", 40, "bold"), textvariable=self.title_var)
self.title_text.place(relx = 0.5, rely = 0, anchor=N, height = 80, width=1100)
self.dets_text = Label(self.parent, textvariable=self.dets_var, fg="white", bg="black", font=("Courier", 20, "bold"))
self.dets_text.place(relx = 0.5, y = 80, anchor=N, height = 30, width=1100)
self.odds_text = Label(self.parent, textvariable=self.odds_var, fg="white", bg="black", font=("Courier", 20, "bold"))
self.odds_text.place(relx = 0.5, y = 110, anchor=N, width=1100, height = 600)
self.quitbutton = Button(self.parent, text='quit', command=self.quitclick)
self.quitbutton.place(relx = 0.5, rely = 1, anchor=S)
#---TEMP END ---#
def quitclick(self):
self.threading_timer.stop()
self.parent.destroy()
开发者ID:jordangriffiths01,项目名称:betting-display,代码行数:98,代码来源:betting_display.py
示例16: start
def start(self):
self.thread = create_and_start_thread(self.download, "DownloadNodeJS")
if self.animation_loader :
self.interval_animation = RepeatedTimer(self.animation_loader.sec, self.animation_loader.animate)
开发者ID:jordanwade,项目名称:dotfiles,代码行数:4,代码来源:installer.py
示例17: DownloadNodeJS
class DownloadNodeJS(object):
def __init__(self, node_version):
self.NODE_JS_VERSION = node_version
self.NODE_JS_TAR_EXTENSION = ".zip" if node_variables.NODE_JS_OS == "win" else ".tar.gz"
self.NODE_JS_BINARY_URL = "https://nodejs.org/dist/"+self.NODE_JS_VERSION+"/node-"+self.NODE_JS_VERSION+"-"+node_variables.NODE_JS_OS+"-"+node_variables.NODE_JS_ARCHITECTURE+self.NODE_JS_TAR_EXTENSION
self.NODE_JS_BINARY_TARFILE_NAME = self.NODE_JS_BINARY_URL.split('/')[-1]
self.NODE_JS_BINARY_TARFILE_FULL_PATH = os.path.join(node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM, self.NODE_JS_BINARY_TARFILE_NAME)
self.animation_loader = AnimationLoader(["[= ]", "[ = ]", "[ = ]", "[ = ]", "[ =]", "[ = ]", "[ = ]", "[ = ]"], 0.067, "Downloading: "+self.NODE_JS_BINARY_URL+" ")
self.interval_animation = None
self.thread = None
def download(self):
try :
if os.path.exists(node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM):
self.rmtree(node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
os.makedirs(node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
else :
os.makedirs(node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
if os.path.exists(node_variables.NODE_MODULES_PATH):
self.rmtree(node_variables.NODE_MODULES_PATH)
request = urllib.request.Request(self.NODE_JS_BINARY_URL)
request.add_header('User-agent', r'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1')
with urllib.request.urlopen(request) as response :
with open(self.NODE_JS_BINARY_TARFILE_FULL_PATH, 'wb') as out_file :
shutil.copyfileobj(response, out_file)
except Exception as err :
traceback.print_exc()
self.on_error(err)
return
self.extract()
self.on_complete()
def start(self):
self.thread = create_and_start_thread(self.download, "DownloadNodeJS")
if self.animation_loader :
self.interval_animation = RepeatedTimer(self.animation_loader.sec, self.animation_loader.animate)
def extract(self):
sep = os.sep
if self.NODE_JS_TAR_EXTENSION != ".zip" :
with tarfile.open(self.NODE_JS_BINARY_TARFILE_FULL_PATH, "r:gz") as tar :
for member in tar.getmembers() :
member.name = sep.join(member.name.split(sep)[1:])
tar.extract(member, node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
else :
if node_variables.NODE_JS_OS == "win" :
import string
from ctypes import windll, c_int, c_wchar_p
UNUSUED_DRIVE_LETTER = ""
for letter in string.ascii_uppercase:
if not os.path.exists(letter+":") :
UNUSUED_DRIVE_LETTER = letter+":"
break
if not UNUSUED_DRIVE_LETTER :
sublime.message_dialog("Can't install node.js and npm! UNUSUED_DRIVE_LETTER not found.")
return
DefineDosDevice = windll.kernel32.DefineDosDeviceW
DefineDosDevice.argtypes = [ c_int, c_wchar_p, c_wchar_p ]
DefineDosDevice(0, UNUSUED_DRIVE_LETTER, node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
try:
with zipfile.ZipFile(self.NODE_JS_BINARY_TARFILE_FULL_PATH, "r") as zip_file :
for member in zip_file.namelist() :
if not member.endswith("/") :
with zip_file.open(member) as node_file:
with open(UNUSUED_DRIVE_LETTER + "\\"+ member.replace("node-"+self.NODE_JS_VERSION+"-"+node_variables.NODE_JS_OS+"-"+node_variables.NODE_JS_ARCHITECTURE+"/", ""), "wb+") as target :
shutil.copyfileobj(node_file, target)
elif not member.endswith("node-"+self.NODE_JS_VERSION+"-"+node_variables.NODE_JS_OS+"-"+node_variables.NODE_JS_ARCHITECTURE+"/"):
os.mkdir(UNUSUED_DRIVE_LETTER + "\\"+ member.replace("node-"+self.NODE_JS_VERSION+"-"+node_variables.NODE_JS_OS+"-"+node_variables.NODE_JS_ARCHITECTURE+"/", ""))
except Exception as e:
print("Error: "+traceback.format_exc())
finally:
DefineDosDevice(2, UNUSUED_DRIVE_LETTER, node_variables.NODE_JS_BINARIES_FOLDER_PLATFORM)
def rmtree(self, path) :
if node_variables.NODE_JS_OS == "win" :
import string
from ctypes import windll, c_int, c_wchar_p
UNUSUED_DRIVE_LETTER = ""
for letter in string.ascii_uppercase:
if not os.path.exists(letter+":") :
UNUSUED_DRIVE_LETTER = letter+":"
break
if not UNUSUED_DRIVE_LETTER :
sublime.message_dialog("Can't remove node.js! UNUSUED_DRIVE_LETTER not found.")
return
DefineDosDevice = windll.kernel32.DefineDosDeviceW
DefineDosDevice.argtypes = [ c_int, c_wchar_p, c_wchar_p ]
DefineDosDevice(0, UNUSUED_DRIVE_LETTER, path)
try:
shutil.rmtree(UNUSUED_DRIVE_LETTER)
except Exception as e:
print("Error: "+traceback.format_exc())
finally:
DefineDosDevice(2, UNUSUED_DRIVE_LETTER, path)
else :
shutil.rmtree(path)
def on_error(self, err):
self.animation_loader.on_complete()
self.interval_animation.stop()
sublime.active_window().status_message("Can't install Node.js! Check your internet connection!")
def on_complete(self):
#.........这里部分代码省略.........
开发者ID:jordanwade,项目名称:dotfiles,代码行数:101,代码来源:installer.py
示例18: __init__
def __init__(self, app, interval):
self._app = app
self._timer = RepeatedTimer(interval, self._sample)
开发者ID:pmarti,项目名称:pydashie,代码行数:3,代码来源:dashie_sampler.py
示例19: len
#.........这里部分代码省略.........
break
if to_clear:
rows_to_clear.append(row_index+1)
if len(rows_to_clear) > 0:
self.clear_lines(rows_to_clear)
def new_tetromino(self):
self.add_tetromino_to_field(self.playfield, self.tetromino_pos_row,
self.tetromino_pos_col, self.tetromino_current,
self.tetromino_form)
self.tetromino_current = None
self.check_for_lines_to_clear()
self.tetromino_current = self.get_random_tetromino()
self.tetromino_pos_row = self.FIELD_ROW_START
self.tetromino_pos_col = self.FIELD_COL_START
self.tetromino_form = 0
if not self.tetromino_fits(self.playfield, self.tetromino_pos_row,
self.tetromino_pos_col, self.tetromino_current,
self.tetromino_form):
self.is_game_over = True
self.game_over_position = 0
self.drop_timer.interval = 0.15
def next_game_over_step(self):
for row in range(len(self.GAME_OVER_TEXT)):
for col in range(config.LED_COLS):
k = (self.game_over_position+col) % len(self.GAME_OVER_TEXT[0])
self.playfield[7+row][1+col] = self.GAME_OVER_COLORS[self.GAME_OVER_TEXT[row][k]]
self.game_over_position += 1
def drop_tetromino(self):
if self.is_game_over:
self.next_game_over_step()
return
if self.tetromino_fits(self.playfield, self.tetromino_pos_row+1,
self.tetromino_pos_col, self.tetromino_current,
self.tetromino_form):
self.tetromino_pos_row += 1
else:
self.new_tetromino()
def move_tetromino(self, row, col, form):
if self.is_game_over:
return
if self.tetromino_fits(self.playfield, self.tetromino_pos_row+row,
self.tetromino_pos_col+col, self.tetromino_current, form):
self.tetromino_pos_row += row
self.tetromino_pos_col += col
self.tetromino_form = form
if row > 0:
# restart drop timer, so we don't drop two tetrominos in a row
self.drop_timer.stop()
self.drop_timer.start()
elif row == 1: # user is at bottom and hits button to go down again
self.new_tetromino()
def run_game_loop(self):
self.frame_rendered(0)
self.drop_timer = RepeatedTimer(1.0, self.drop_tetromino)
while self.loop:
key = self.kp.read_single_keypress()
if key == 'a':
self.speaker.beep_input()
self.move_tetromino(0, -1, self.tetromino_form)
elif key == 'd':
self.speaker.beep_input()
self.move_tetromino(0, 1, self.tetromino_form)
elif key == 's':
self.speaker.beep_input()
self.move_tetromino(1, 0, self.tetromino_form)
elif key == 'k':
self.speaker.beep_input()
self.move_tetromino(0, 0, (self.tetromino_form-1) % 4)
elif key == 'l':
self.speaker.beep_input()
self.move_tetromino(0, 0, (self.tetromino_form+1) % 4)
elif key == 'r':
self.init_game()
elif not config.HAS_GUI and key == 'q':
break
if not config.IS_LED_STRIP_V2:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_RENDERED, None)
else:
self.led_strip.register_callback(self.led_strip.CALLBACK_FRAME_STARTED, None)
self.drop_timer.stop()
self.kp.stop()
开发者ID:Tinkerforge,项目名称:blinkenlights,代码行数:101,代码来源:tetris.py
示例20: msg_callback
#.........这里部分代码省略.........
self.slave_num_every_packup = self.config.get('main', 'slave_num_every_packup')
self.slave_max_sec_each_task = self.config.get('main', 'slave_max_sec_each_task')
self.slave_python_version = self.config.get('main', 'slave_python_version')
def add_slave(self, slave_info):
if self.slave_manager.exist_slave(slave_info):
logging.info('Slave ' + slave_info['host'] + ' already exists.')
return
logging.info('master: add slave' + str(slave_info))
new_slave_info = self.slave_manager.add_slave(slave_info)
self.slave_manager.run_slave(new_slave_info)
# TODO:
def kill_slave(self, slave_info):
if not self.slave_manager.exist_slave(slave_info):
return
logging.info('kill slave ' + str(slave_info))
self.slave_manager.kill_slave(slave_info)
def restart_slave(self, slave_info):
logging.info(slave_info['host'])
logging.info('restart_slave' + str(slave_info))
self.kill_slave(slave_info)
self.add_slave(slave_info)
def start(self, info):
logging.info('Master Starts')
self.last_wake_time = datetime.datetime.utcnow()
self.is_started = True
self.pop_forever_handler = threading.Thread(target=self.start_popping_tasks)
self.pop_forever_handler.start()
self.repeated_timer = RepeatedTimer(self.clean_time_gap, self.notice_refresh, None)
def pop_forever(self):
self.start_popping_tasks()
def get_task_queue_size(self):
pass
# TODO: There is a bottle neck here
def start_popping_tasks(self):
task_connection = ConnectionManager(queue_name=self.task_queue_name,
durable=True, no_ack=False)
eof_reached = False
while self.is_started and not eof_reached:
current_task_queue_size = task_connection.get_task_queue_size()
while self.is_started and current_task_queue_size < self.task_queue_size_limit:
task = self.task_manager.pop_task()
if task is None:
# TODO: Don't use Error. Just break and handle the case later in this function
logging.info('EOF Reached')
eof_reached = True
break
message = 'WORK ' + ujson.dumps(task)
task_connection.publish(message)
current_task_queue_size += 1
task_connection.stop()
def fail(self, slave_task_info):
self.task_manager.add_task(slave_task_info['task'])
self.slave_manager.update_last_response(slave_task_info)
def success(self, slave_task_info):
开发者ID:habemusne,项目名称:flexibloader,代码行数:67,代码来源:master.py
注:本文中的repeated_timer.RepeatedTimer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论