• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python time.sleep函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中time.time.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: save_table

 def save_table(self, code, date):
     TR_REQ_TIME_INTERVAL = 4
     time.sleep(TR_REQ_TIME_INTERVAL)
     data_81 = self.wrapper.get_data_opt10081(code, date)
     time.sleep(TR_REQ_TIME_INTERVAL)
     data_86 = self.wrapper.get_data_opt10086(code, date)
     col_86 = ['전일비', '등락률', '금액(백만)', '신용비', '개인', '기관', '외인수량', '외국계', '프로그램',
               '외인비', '체결강도', '외인보유', '외인비중', '외인순매수', '기관순매수', '개인순매수', '신용잔고율']
     data = pd.concat([data_81, data_86.loc[:, col_86]], axis=1)
     #con = sqlite3.connect("../data/stock.db")
     try:
         data = data.loc[data.index > int(self.kiwoom.start_date.strftime("%Y%m%d"))]
         #orig_data = pd.read_sql("SELECT * FROM '%s'" % code, con, index_col='일자').sort_index()
         orig_data = pd.read_hdf("../data/hdf/%s.hdf" % code, 'day').sort_index()
         end_date = orig_data.index[-1]
         orig_data = orig_data.loc[orig_data.index < end_date]
         data = data.loc[data.index >= end_date]
         data = pd.concat([orig_data, data], axis=0)
     except (FileNotFoundError, IndexError) as e:
         print(e)
         pass
     finally:
         data.index.name = '일자'
         if len(data) != 0:
             #data.to_sql(code, con, if_exists='replace')
             data.to_hdf('../data/hdf/%s.hdf'%code, 'day', mode='w')
开发者ID:juwarny,项目名称:PyTrader,代码行数:26,代码来源:save_data.py


示例2: wait_element_view

def wait_element_view(img, wait_time=1, confidence=0.999):
    for it in range(1, wait_time):
        if pyautogui.locateCenterOnScreen(img, confidence=confidence) is None: 
            time.sleep(1)
            continue
        else:
            break
开发者ID:rinkeigun,项目名称:linux_module,代码行数:7,代码来源:cad.py


示例3: run

 def run(self):
     while 1:
         try:
             self.run_once()
         except:
             print now(), "Error - ", get_err()
         time.sleep(60 * self.interval)
开发者ID:ulrichard,项目名称:funny-bot-bitcoin,代码行数:7,代码来源:bollingerbot.py


示例4: run

def run():
  print "Starting iotdata script"
 
#  feed = api.feeds.get(FEED_ID)
 
#  datastream = get_datastream(feed)
#  datastream.max_value = None
#  datastream.min_value = None
  client_carriots = Client(API_KEY)

  while True:
    timestamp = int(mktime(datetime.utcnow().timetuple()))
    load_avg = read_loadavg()
 
    if DEBUG:
      print "Updating feed with value: %s" % load_avg
      print "Time : %s" % timestamp

    data = {"protocol": "v2", "device": FEED_ID, "at": timestamp, "data": load_avg}
    carriots_response = client_carriots.send(data)
    print carriots_response.read() 
#    datastream.current_value = load_avg
#    datastream.at = datetime.datetime.utcnow()
#    try:
#      datastream.update()
#    except requests.HTTPError as e:
#      print "HTTPError({0}): {1}".format(e.errno, e.strerror)
 
    time.sleep(10)
开发者ID:jsanchezgit,项目名称:ecenter,代码行数:29,代码来源:iotdata.py


示例5: do_GET

 def do_GET(self):
     parts = urlparse(self.path)
     query = parts[2]
     params = [param.split("=") for param in parts[4].split("&")]
     if query == "/book/":
         if params and params[0][0] == "id":
             id = int(params[0][1])
             if id == 1:
                 title = "SICP"
             elif id == 2:
                 title = "jQuery programming"
             else:
                 title = "NA"
             self.wfile.write(
                 simplejson.dumps(
                     {'id':id,'title':title}
                 )
             )
         else:
             self.wfile.write("")
     elif query == "/timeout/":
         time.sleep(4)
         self.wfile.write("")
     else:
         self.wfile.write("")
开发者ID:Mondego,项目名称:pyreco,代码行数:25,代码来源:allPythonContent.py


示例6: location

def location(request, loc, format=None):
	'''
	Will one day be a wrapper for all data models, searching over all locations
	and organizations, maybe even people too
	'''
	from campus.models import Building, Location
	
	try:
		location = Building.objects.get(pk=loc)
	except Building.DoesNotExist:
		try:
			location = Location.objects.get(pk=loc)
		except Location.DoesNotExist:
			raise Http404()
	
	html = location_html(location, request)
	location = location.json()
	location['info'] = html
	base_url = request.build_absolute_uri('/')[:-1]
	location['marker'] = base_url + settings.MEDIA_URL + 'images/markers/yellow.png'
	
	if format == 'bubble':
		return render(request, template, context)
	
	if format == 'json':
		if settings.DEBUG:
			import time
			time.sleep(.5)
		
		response = HttpResponse(json.dumps(location))
		response['Content-type'] = 'application/json'
		return response
	
	return home(request, location=location)
开发者ID:beck,项目名称:Campus-Map,代码行数:34,代码来源:views.py


示例7: zelda

def zelda(): # sorry, dit moest even... :)
	winsound.Beep(210,200)
	winsound.Beep(225,200)
	winsound.Beep(238,200)
	winsound.Beep(250,200)

	winsound.Beep(210,200)
	winsound.Beep(225,200)
	winsound.Beep(238,200)
	winsound.Beep(250,200)

	winsound.Beep(225,180)
	winsound.Beep(238,180)
	winsound.Beep(250,180)
	winsound.Beep(262,180)

	winsound.Beep(225,180)
	winsound.Beep(238,180)
	winsound.Beep(250,180)
	winsound.Beep(262,180)

	winsound.Beep(238,150)
	winsound.Beep(250,150)
	winsound.Beep(262,150)
	winsound.Beep(275,150)

	winsound.Beep(238*2,150)
	winsound.Beep(250*2,150)
	winsound.Beep(262*2,150)
	winsound.Beep(275*2,160)
	time.sleep(1)
	winsound.Beep(210*4,150)
	winsound.Beep(225*4,150)
	winsound.Beep(238*4,150)
	winsound.Beep(250*4,1100)
开发者ID:Ber10e,项目名称:Author_classifier,代码行数:35,代码来源:help_functions.py


示例8: _send

 def _send(self, command, retries=5, timeout=100):
     fd = self._fd
     if len(command) != 33:
         raise ValueError("command must be 33 bytes long")
     handler = signal.signal(signal.SIGALRM, _TimeoutError.timeout)
     for attempt in range(retries):
         signal.setitimer(signal.ITIMER_REAL, timeout/1000.0)
         try:
             if LOG.isEnabledFor(logging.DEBUG):
                 LOG.debug("Write: {}", hexlify(command[1:]))
             fd.write(command)
             fd.flush()
             reply = bytearray(fd.read(32))
             if LOG.isEnabledFor(logging.DEBUG):
                 LOG.debug("Recv: {}", hexlify(reply))
             signal.setitimer(signal.ITIMER_REAL, 0)
             if reply[0] != command[1]:
                 msg = "Expected msg type {} but got {}"
                 raise IOError(msg.format(command[1], reply[0]))
             return reply[1:]
         except _TimeoutError:
             print("IO timed out, try #%d." % attempt)
             time.sleep(0.000001)
         finally:
             signal.signal(signal.SIGALRM, handler)
     msg = "Gving up on PlasmaTrim {}"
     raise IOError(msg.format(self))
开发者ID:svirpridon,项目名称:plasmatrim,代码行数:27,代码来源:plasmatrim.py


示例9: connect_db

def connect_db(db_name):
    """Connect to the database if open, and start database if not running."""
    try:
        client = couchdb.Server()

    except:
        subprocess.call(['couchdb', '-b'])
        time.sleep(2)
        client = couchdb.Server()

    try:
        db = client[db_name]

    except:
        client.create(db_name)
        db = client[db_name]
        toc = {}
        toc['n_runs'] = 0

        toc['_id'] = 'toc'
        db.save(toc)

    #create permanent view to all if one doesn't exist
    if '_design/all' not in db:
        view_def = ViewDefinition('all', 'all',''' 
				  function(doc) { 
				      if( doc.run_number )
					  emit(parseInt(doc.run_number), doc);
				  }''')
        view_def.sync(db)

    return db
开发者ID:uwmuonlab,项目名称:uw-lab-daq,代码行数:32,代码来源:start_online.py


示例10: test_fail_login

    def test_fail_login(self):
        print "Starting Login and Logout Test"
        wait = WebDriverWait(self.driver, self.waitTime)

        self.driver.get(self.url)
        wait.until(EC.element_to_be_clickable((By.ID, 'login-button'))).click()
        wait.until(
            EC.element_to_be_clickable((By.ID, 'weebly-username'))).click()
        wait.until(
            EC.element_to_be_clickable((By.ID, 'weebly-password'))).click()
        Username = self.driver.find_element_by_id('weebly-username')
        Username.send_keys(self.email)
        Password = self.driver.find_element_by_id('weebly-password')
        Password.send_keys("wrong")
        self.driver.find_element_by_xpath("//input[@value='Log in']").click()
        for i in range(60):
            try:
                if "Wrong username or password" == self.driver.find_element_by_css_selector("div.popover-content").text:
                    break
            except:
                pass
            time.sleep(1)
        else:
            self.fail("time out")
        try:
            self.assertEqual("Wrong username or password", self.driver.find_element_by_css_selector(
                "div.popover-content").text)
        except AssertionError as e:
            self.verificationErrors.append(str(e))
开发者ID:jcostello84,项目名称:web-test,代码行数:29,代码来源:login_fail_sauce_test.py


示例11: flush

 def flush(self):
     '''
     release the modified and deleted keys queued for this table
     '''
     # remove deleted keys
     for key in self.deleted_keys:
         self.values.remove_key_data(key)
         self.dynamo_table.remove_range_obj(key)
         time.sleep(0.5)
         if settings.DEBUG:
             print 'removed'
     
     self.deleted_keys = []
     
     # store modified changes
     for key in self.modified_keys:
         # save data by assigning from cached
         data = self.values.get_key_data(key)
         self.dynamo_table.set_range_obj(key, data)
         time.sleep(0.5)
         if settings.DEBUG:
             print 'updated '+key
     
     self.modified_keys = []
     
     self.dynamo_table.remove_range_obj('drilldown-lock')
     self.is_locked = False
开发者ID:encolpe,项目名称:apetizer,代码行数:27,代码来源:dynamodb.py


示例12: main_loop

 def main_loop(self):
     running = False
     while running != True:
         print 'STAND-BY'
         stick = self.read_stick()
         print stick
         if (self.stick_rest - int(stick[0])) > self.stick_grens:
             running = True
         else:
             time.sleep(self.waiting_period_sensors)
     print 'LOOP'
     location = self.read_arduino()[0:2]
     print self.session_names
     new_table(self.current_session_name)
     self.session_names += [self.current_session_name]
     past_periods_pic = 0
     pic_name = None
     while running:
         # Controleer of de knuppel in de juiste richting bewogen wordt om te stoppen met tracken:
         stick = self.read_stick()
         if (int(stick[0]) - self.stick_rest) > self.stick_grens:
             self.upload_mode()
         # Controleer of het voldoende lussen geleden is dat er nog een foto genomen is, en neem indien nodig een foto:
         if past_periods_pic >= self.number_of_periods_pic:
             pic_name = self.take_pic()
             past_periods_pic = 0
         else:
             pic_name = None
             past_periods_pic += 1
         # Run de tracker-functie en sla de teruggegeven gegevens op in een tijdelijke variabele temp:
         temp = self.tracker(self.current_session_name, location, pic_name)
         self.current_session_name = temp[0]
         location = temp[1]
         time.sleep(self.waiting_period_sensors)
开发者ID:JelleBosmans,项目名称:PenO_CWA1,代码行数:34,代码来源:SessionManager.py


示例13: __init__

  def __init__(self, port, baudrate, wait_ready = True, timeout = True):
    
    # si le port n'est pas utilisé
    if port not in ARDUINO_PORTS:
      raise InvalidPort(port)
      
    # si le baudrate n'existe pas
    if baudrate not in Serial.BAUDRATES:
      raise InvalidBaudRate(baudrate)
    
    # on appel le constructeur parent
    Serial.__init__(self, port, baudrate, timeout = timeout)
    
    # connection start : temps de début de connexion
    self.connection_start = time()

    # on dort un petit moment
    if not isinstance(wait_ready, bool):
      time.sleep(wait_ready)
     
    # temps qu'on n'a pas de réponse on attend
    elif True:
      while not self.read():
        continue
       
      # temps nécéssaire pour obtenir la première connexion
      self.connection_first_answer = time() - self.connection_start
      print(self.connection_first_answer)
开发者ID:Kestrel67,项目名称:Python-Libraries,代码行数:28,代码来源:ArduinoInterface.py


示例14: echoServer

def echoServer(hostname='localhost',port=1972,timeout=5000):
        		
    ftc = FieldTrip.Client()		
    # Wait until the buffer connects correctly and returns a valid header
    hdr = None;
    while hdr is None :
        print('Trying to connect to buffer on %s:%i ...'%(hostname,port))
        try:
            ftc.connect(hostname, port)
            print('\nConnected - trying to read header...')
            hdr = ftc.getHeader()
        except IOError:
            pass
	
        if hdr is None:
            print('Invalid Header... waiting')
            time.sleep(1)
        else:
            print(hdr)
            print(hdr.labels)

    # Now do the echo server
    nEvents=hdr.nEvents;
    endExpt=None;
    while endExpt is None :
        (curSamp,curEvents)=ftc.wait(-1,nEvents,timeout) # Block until there are new events to process
        if curEvents>nEvents : # get any new events
            evts=ftc.getEvents([nEvents,curEvents-1]) 
            nEvents=curEvents # update record of which events we've seen
            ftc.putEvents(evt)
    
    ftc.disconnect() # disconnect from buffer when done
开发者ID:jadref,项目名称:buffer_bci,代码行数:32,代码来源:echoServer_skel.py


示例15: poll

    def poll(self, timeout=None):
        rfds, wfds, xfds = self._read_fds.keys(), self._write_fds.keys(), self._exception_fds.keys()
        # FIXME Hack for Windows
        if not rfds and not wfds and not xfds:
            import platform
            if platform.system() == 'Windows':
                import time
                time.sleep(0.1)
                return

        (rfds, wfds, xfds) = select.select(rfds, wfds, xfds, timeout)
        ready_fds = {}
        for fd in rfds:
            ready_fds[fd] = self._read_fds[fd]

        for fd in wfds:
            if fd in ready_fds:
                ready_fds[fd] |= self._write_fds[fd]
            else:
                ready_fds[fd] = self._write_fds[fd]

        for fd in xfds:
            if fd in ready_fds:
                ready_fds[fd] |= self._exception_fds[fd]
            else:
                ready_fds[fd] = self._exception_fds[fd]

        return [fd_evt for fd_evt in ready_fds.iteritems()]
开发者ID:chenziliang,项目名称:src,代码行数:28,代码来源:z_poller.py


示例16: __call__

    def __call__(self, user_name, cache={}):

        first_seen_metric = userstats.user.api.core.FirstSeen()
        first_seen = cache.get(str(first_seen_metric), first_seen_metric(user_name, cache))
        start = get_date(self.start, cache['reference_date'], first_seen, self.strict)
        end = get_date(self.end, cache['reference_date'], first_seen, self.strict)

        if not start or not end:
            return None
        contribs_metric = Contribs()
        contribs = cache.get(str(contribs_metric), contribs_metric(user_name, cache))
       
        if self.rq:
            q = rq.Queue('diffs', connection=redis.Redis(port=self.redis_port))
            results = [q.enqueue(process_contrib_bytes_added, (contrib, start, end, self.ns, user_name, cache)) for contrib in contribs]

            while not all(map(lambda res: res.is_finished or res.is_failed, results)):
                time.sleep(.1)
            bytes_added_results = map(attrgetter('result'), results)
        else:
            bytes_added_results = map(process_contrib_bytes_added, zip(contribs,
                                                                   itertools.repeat(start),
                                                                   itertools.repeat(end),
                                                                   itertools.repeat(self.ns),
                                                                   itertools.repeat(user_name),
                                                                   itertools.repeat(cache)))
        #bytes_added = {}
        for b, contrib in zip(bytes_added_results, contribs):
            #bytes_added[contrib['revid']] = b
            contrib['bytes_added'] = b
        return sum(bytes_added_results)
开发者ID:embr,项目名称:userstats,代码行数:31,代码来源:survival.py


示例17: setup_backend

    def setup_backend(self):
        from pymongo import ASCENDING, DESCENDING
        from pymongo.connection import Connection, _parse_uri
        from pymongo.errors import AutoReconnect

        _connection = None
        uri = self.options.pop('uri', u'')
        _connection_attempts = 0

        hosts, database, user, password = _parse_uri(uri, Connection.PORT)

        # Handle auto reconnect signals properly
        while _connection_attempts < 5:
            try:
                if _connection is None:
                    _connection = Connection(uri)
                database = _connection[database]
                break
            except AutoReconnect:
                _connection_attempts += 1
                time.sleep(0.1)

        self.database = database

        # setup correct indexes
        database.tickets.ensure_index([('record_hash', ASCENDING)], unique=True)
        database.tickets.ensure_index([('solved', ASCENDING), ('level', ASCENDING)])
        database.occurrences.ensure_index([('time', DESCENDING)])
开发者ID:RonnyPfannschmidt,项目名称:logbook,代码行数:28,代码来源:ticketing.py


示例18: crawl

	def crawl(self):
		_starttime = time.time()
		if self.restrict == None:
			self.restrict = "http://%s.*" % self.init_domain

		print "Deadlink-crawler version 1.1"
		print "Starting crawl from URL %s at %s with restriction %s\n" % (self.init_url, strftime("%Y-%m-%d %H:%M:%S", gmtime()), "http://%s.*" % self.init_domain)

		while len(self.frontier) > 0:
			time.sleep(self.wait_time)
			
			next_time, next_url = self.frontier.next()
			
			while time.time() < next_time:
				time.sleep(0.5)
			
			try:
				self.visit_url(next_url[0], next_url[1])
			except urllib2.URLError:
				continue
		
		self.print_deadlinks(self.deadlinks)

		_elapsed = time.time() - _starttime
		
		print "\nSummary:\n--------"
		print "Crawled %d pages and checked %d links in %s time." % (self._pages, self._links, strftime("%H:%M:%S", gmtime(_elapsed)))
		print "Found a total of %d deadlinks in %d different pages" % (self._dead, self._via)
		
		if len(self.deadlinks) == 0:
			exit(0)
		else:
			exit(2)
开发者ID:janhoy,项目名称:deadlink-crawler,代码行数:33,代码来源:crawler.py


示例19: stop

 def stop(self):
     if self._cur_trace:
         self.end_trace()
     if self._proxy_proc:
         self._stop_signaled = True
         os.kill(self._proxy_proc.pid,signal.SIGUSR1)
         time.sleep(0.5)
开发者ID:zncb,项目名称:wtrace,代码行数:7,代码来源:workers.py


示例20: main

    def main(self, name):
        if name != "__main__":
            return
        import optparse
        parser = optparse.OptionParser()
        options, args = parser.parse_args()

        if not 0 < len(args) < 3:
            parser.error("Must supply exactly one event type and"
                         " optionally one JSON data structure.")

        event_type = args[0]
        if len(args) > 1:
            json_data = args[1]
            try:
                data = json.loads(json_data)
            except Exception:
                parser.error("Could not parse JSON")
        else:
            data = {}
        self.record(event_type, **data)
        #print "Recorded event: %s (%s)" % (event_type, data)
        # This needs to be removed but for now is necessary to make sure
        # everything gets flushed
        import time
        time.sleep(1)
开发者ID:lgastako,项目名称:helios,代码行数:26,代码来源:abstractclient.py



注:本文中的time.time.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python time.strftime函数代码示例发布时间:2022-05-27
下一篇:
Python time.localtime函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap