本文整理汇总了Python中time.time.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: save_table
def save_table(self, code, date):
TR_REQ_TIME_INTERVAL = 4
time.sleep(TR_REQ_TIME_INTERVAL)
data_81 = self.wrapper.get_data_opt10081(code, date)
time.sleep(TR_REQ_TIME_INTERVAL)
data_86 = self.wrapper.get_data_opt10086(code, date)
col_86 = ['전일비', '등락률', '금액(백만)', '신용비', '개인', '기관', '외인수량', '외국계', '프로그램',
'외인비', '체결강도', '외인보유', '외인비중', '외인순매수', '기관순매수', '개인순매수', '신용잔고율']
data = pd.concat([data_81, data_86.loc[:, col_86]], axis=1)
#con = sqlite3.connect("../data/stock.db")
try:
data = data.loc[data.index > int(self.kiwoom.start_date.strftime("%Y%m%d"))]
#orig_data = pd.read_sql("SELECT * FROM '%s'" % code, con, index_col='일자').sort_index()
orig_data = pd.read_hdf("../data/hdf/%s.hdf" % code, 'day').sort_index()
end_date = orig_data.index[-1]
orig_data = orig_data.loc[orig_data.index < end_date]
data = data.loc[data.index >= end_date]
data = pd.concat([orig_data, data], axis=0)
except (FileNotFoundError, IndexError) as e:
print(e)
pass
finally:
data.index.name = '일자'
if len(data) != 0:
#data.to_sql(code, con, if_exists='replace')
data.to_hdf('../data/hdf/%s.hdf'%code, 'day', mode='w')
开发者ID:juwarny,项目名称:PyTrader,代码行数:26,代码来源:save_data.py
示例2: wait_element_view
def wait_element_view(img, wait_time=1, confidence=0.999):
for it in range(1, wait_time):
if pyautogui.locateCenterOnScreen(img, confidence=confidence) is None:
time.sleep(1)
continue
else:
break
开发者ID:rinkeigun,项目名称:linux_module,代码行数:7,代码来源:cad.py
示例3: run
def run(self):
while 1:
try:
self.run_once()
except:
print now(), "Error - ", get_err()
time.sleep(60 * self.interval)
开发者ID:ulrichard,项目名称:funny-bot-bitcoin,代码行数:7,代码来源:bollingerbot.py
示例4: run
def run():
print "Starting iotdata script"
# feed = api.feeds.get(FEED_ID)
# datastream = get_datastream(feed)
# datastream.max_value = None
# datastream.min_value = None
client_carriots = Client(API_KEY)
while True:
timestamp = int(mktime(datetime.utcnow().timetuple()))
load_avg = read_loadavg()
if DEBUG:
print "Updating feed with value: %s" % load_avg
print "Time : %s" % timestamp
data = {"protocol": "v2", "device": FEED_ID, "at": timestamp, "data": load_avg}
carriots_response = client_carriots.send(data)
print carriots_response.read()
# datastream.current_value = load_avg
# datastream.at = datetime.datetime.utcnow()
# try:
# datastream.update()
# except requests.HTTPError as e:
# print "HTTPError({0}): {1}".format(e.errno, e.strerror)
time.sleep(10)
开发者ID:jsanchezgit,项目名称:ecenter,代码行数:29,代码来源:iotdata.py
示例5: do_GET
def do_GET(self):
parts = urlparse(self.path)
query = parts[2]
params = [param.split("=") for param in parts[4].split("&")]
if query == "/book/":
if params and params[0][0] == "id":
id = int(params[0][1])
if id == 1:
title = "SICP"
elif id == 2:
title = "jQuery programming"
else:
title = "NA"
self.wfile.write(
simplejson.dumps(
{'id':id,'title':title}
)
)
else:
self.wfile.write("")
elif query == "/timeout/":
time.sleep(4)
self.wfile.write("")
else:
self.wfile.write("")
开发者ID:Mondego,项目名称:pyreco,代码行数:25,代码来源:allPythonContent.py
示例6: location
def location(request, loc, format=None):
'''
Will one day be a wrapper for all data models, searching over all locations
and organizations, maybe even people too
'''
from campus.models import Building, Location
try:
location = Building.objects.get(pk=loc)
except Building.DoesNotExist:
try:
location = Location.objects.get(pk=loc)
except Location.DoesNotExist:
raise Http404()
html = location_html(location, request)
location = location.json()
location['info'] = html
base_url = request.build_absolute_uri('/')[:-1]
location['marker'] = base_url + settings.MEDIA_URL + 'images/markers/yellow.png'
if format == 'bubble':
return render(request, template, context)
if format == 'json':
if settings.DEBUG:
import time
time.sleep(.5)
response = HttpResponse(json.dumps(location))
response['Content-type'] = 'application/json'
return response
return home(request, location=location)
开发者ID:beck,项目名称:Campus-Map,代码行数:34,代码来源:views.py
示例7: zelda
def zelda(): # sorry, dit moest even... :)
winsound.Beep(210,200)
winsound.Beep(225,200)
winsound.Beep(238,200)
winsound.Beep(250,200)
winsound.Beep(210,200)
winsound.Beep(225,200)
winsound.Beep(238,200)
winsound.Beep(250,200)
winsound.Beep(225,180)
winsound.Beep(238,180)
winsound.Beep(250,180)
winsound.Beep(262,180)
winsound.Beep(225,180)
winsound.Beep(238,180)
winsound.Beep(250,180)
winsound.Beep(262,180)
winsound.Beep(238,150)
winsound.Beep(250,150)
winsound.Beep(262,150)
winsound.Beep(275,150)
winsound.Beep(238*2,150)
winsound.Beep(250*2,150)
winsound.Beep(262*2,150)
winsound.Beep(275*2,160)
time.sleep(1)
winsound.Beep(210*4,150)
winsound.Beep(225*4,150)
winsound.Beep(238*4,150)
winsound.Beep(250*4,1100)
开发者ID:Ber10e,项目名称:Author_classifier,代码行数:35,代码来源:help_functions.py
示例8: _send
def _send(self, command, retries=5, timeout=100):
fd = self._fd
if len(command) != 33:
raise ValueError("command must be 33 bytes long")
handler = signal.signal(signal.SIGALRM, _TimeoutError.timeout)
for attempt in range(retries):
signal.setitimer(signal.ITIMER_REAL, timeout/1000.0)
try:
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("Write: {}", hexlify(command[1:]))
fd.write(command)
fd.flush()
reply = bytearray(fd.read(32))
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("Recv: {}", hexlify(reply))
signal.setitimer(signal.ITIMER_REAL, 0)
if reply[0] != command[1]:
msg = "Expected msg type {} but got {}"
raise IOError(msg.format(command[1], reply[0]))
return reply[1:]
except _TimeoutError:
print("IO timed out, try #%d." % attempt)
time.sleep(0.000001)
finally:
signal.signal(signal.SIGALRM, handler)
msg = "Gving up on PlasmaTrim {}"
raise IOError(msg.format(self))
开发者ID:svirpridon,项目名称:plasmatrim,代码行数:27,代码来源:plasmatrim.py
示例9: connect_db
def connect_db(db_name):
"""Connect to the database if open, and start database if not running."""
try:
client = couchdb.Server()
except:
subprocess.call(['couchdb', '-b'])
time.sleep(2)
client = couchdb.Server()
try:
db = client[db_name]
except:
client.create(db_name)
db = client[db_name]
toc = {}
toc['n_runs'] = 0
toc['_id'] = 'toc'
db.save(toc)
#create permanent view to all if one doesn't exist
if '_design/all' not in db:
view_def = ViewDefinition('all', 'all','''
function(doc) {
if( doc.run_number )
emit(parseInt(doc.run_number), doc);
}''')
view_def.sync(db)
return db
开发者ID:uwmuonlab,项目名称:uw-lab-daq,代码行数:32,代码来源:start_online.py
示例10: test_fail_login
def test_fail_login(self):
print "Starting Login and Logout Test"
wait = WebDriverWait(self.driver, self.waitTime)
self.driver.get(self.url)
wait.until(EC.element_to_be_clickable((By.ID, 'login-button'))).click()
wait.until(
EC.element_to_be_clickable((By.ID, 'weebly-username'))).click()
wait.until(
EC.element_to_be_clickable((By.ID, 'weebly-password'))).click()
Username = self.driver.find_element_by_id('weebly-username')
Username.send_keys(self.email)
Password = self.driver.find_element_by_id('weebly-password')
Password.send_keys("wrong")
self.driver.find_element_by_xpath("//input[@value='Log in']").click()
for i in range(60):
try:
if "Wrong username or password" == self.driver.find_element_by_css_selector("div.popover-content").text:
break
except:
pass
time.sleep(1)
else:
self.fail("time out")
try:
self.assertEqual("Wrong username or password", self.driver.find_element_by_css_selector(
"div.popover-content").text)
except AssertionError as e:
self.verificationErrors.append(str(e))
开发者ID:jcostello84,项目名称:web-test,代码行数:29,代码来源:login_fail_sauce_test.py
示例11: flush
def flush(self):
'''
release the modified and deleted keys queued for this table
'''
# remove deleted keys
for key in self.deleted_keys:
self.values.remove_key_data(key)
self.dynamo_table.remove_range_obj(key)
time.sleep(0.5)
if settings.DEBUG:
print 'removed'
self.deleted_keys = []
# store modified changes
for key in self.modified_keys:
# save data by assigning from cached
data = self.values.get_key_data(key)
self.dynamo_table.set_range_obj(key, data)
time.sleep(0.5)
if settings.DEBUG:
print 'updated '+key
self.modified_keys = []
self.dynamo_table.remove_range_obj('drilldown-lock')
self.is_locked = False
开发者ID:encolpe,项目名称:apetizer,代码行数:27,代码来源:dynamodb.py
示例12: main_loop
def main_loop(self):
running = False
while running != True:
print 'STAND-BY'
stick = self.read_stick()
print stick
if (self.stick_rest - int(stick[0])) > self.stick_grens:
running = True
else:
time.sleep(self.waiting_period_sensors)
print 'LOOP'
location = self.read_arduino()[0:2]
print self.session_names
new_table(self.current_session_name)
self.session_names += [self.current_session_name]
past_periods_pic = 0
pic_name = None
while running:
# Controleer of de knuppel in de juiste richting bewogen wordt om te stoppen met tracken:
stick = self.read_stick()
if (int(stick[0]) - self.stick_rest) > self.stick_grens:
self.upload_mode()
# Controleer of het voldoende lussen geleden is dat er nog een foto genomen is, en neem indien nodig een foto:
if past_periods_pic >= self.number_of_periods_pic:
pic_name = self.take_pic()
past_periods_pic = 0
else:
pic_name = None
past_periods_pic += 1
# Run de tracker-functie en sla de teruggegeven gegevens op in een tijdelijke variabele temp:
temp = self.tracker(self.current_session_name, location, pic_name)
self.current_session_name = temp[0]
location = temp[1]
time.sleep(self.waiting_period_sensors)
开发者ID:JelleBosmans,项目名称:PenO_CWA1,代码行数:34,代码来源:SessionManager.py
示例13: __init__
def __init__(self, port, baudrate, wait_ready = True, timeout = True):
# si le port n'est pas utilisé
if port not in ARDUINO_PORTS:
raise InvalidPort(port)
# si le baudrate n'existe pas
if baudrate not in Serial.BAUDRATES:
raise InvalidBaudRate(baudrate)
# on appel le constructeur parent
Serial.__init__(self, port, baudrate, timeout = timeout)
# connection start : temps de début de connexion
self.connection_start = time()
# on dort un petit moment
if not isinstance(wait_ready, bool):
time.sleep(wait_ready)
# temps qu'on n'a pas de réponse on attend
elif True:
while not self.read():
continue
# temps nécéssaire pour obtenir la première connexion
self.connection_first_answer = time() - self.connection_start
print(self.connection_first_answer)
开发者ID:Kestrel67,项目名称:Python-Libraries,代码行数:28,代码来源:ArduinoInterface.py
示例14: echoServer
def echoServer(hostname='localhost',port=1972,timeout=5000):
ftc = FieldTrip.Client()
# Wait until the buffer connects correctly and returns a valid header
hdr = None;
while hdr is None :
print('Trying to connect to buffer on %s:%i ...'%(hostname,port))
try:
ftc.connect(hostname, port)
print('\nConnected - trying to read header...')
hdr = ftc.getHeader()
except IOError:
pass
if hdr is None:
print('Invalid Header... waiting')
time.sleep(1)
else:
print(hdr)
print(hdr.labels)
# Now do the echo server
nEvents=hdr.nEvents;
endExpt=None;
while endExpt is None :
(curSamp,curEvents)=ftc.wait(-1,nEvents,timeout) # Block until there are new events to process
if curEvents>nEvents : # get any new events
evts=ftc.getEvents([nEvents,curEvents-1])
nEvents=curEvents # update record of which events we've seen
ftc.putEvents(evt)
ftc.disconnect() # disconnect from buffer when done
开发者ID:jadref,项目名称:buffer_bci,代码行数:32,代码来源:echoServer_skel.py
示例15: poll
def poll(self, timeout=None):
rfds, wfds, xfds = self._read_fds.keys(), self._write_fds.keys(), self._exception_fds.keys()
# FIXME Hack for Windows
if not rfds and not wfds and not xfds:
import platform
if platform.system() == 'Windows':
import time
time.sleep(0.1)
return
(rfds, wfds, xfds) = select.select(rfds, wfds, xfds, timeout)
ready_fds = {}
for fd in rfds:
ready_fds[fd] = self._read_fds[fd]
for fd in wfds:
if fd in ready_fds:
ready_fds[fd] |= self._write_fds[fd]
else:
ready_fds[fd] = self._write_fds[fd]
for fd in xfds:
if fd in ready_fds:
ready_fds[fd] |= self._exception_fds[fd]
else:
ready_fds[fd] = self._exception_fds[fd]
return [fd_evt for fd_evt in ready_fds.iteritems()]
开发者ID:chenziliang,项目名称:src,代码行数:28,代码来源:z_poller.py
示例16: __call__
def __call__(self, user_name, cache={}):
first_seen_metric = userstats.user.api.core.FirstSeen()
first_seen = cache.get(str(first_seen_metric), first_seen_metric(user_name, cache))
start = get_date(self.start, cache['reference_date'], first_seen, self.strict)
end = get_date(self.end, cache['reference_date'], first_seen, self.strict)
if not start or not end:
return None
contribs_metric = Contribs()
contribs = cache.get(str(contribs_metric), contribs_metric(user_name, cache))
if self.rq:
q = rq.Queue('diffs', connection=redis.Redis(port=self.redis_port))
results = [q.enqueue(process_contrib_bytes_added, (contrib, start, end, self.ns, user_name, cache)) for contrib in contribs]
while not all(map(lambda res: res.is_finished or res.is_failed, results)):
time.sleep(.1)
bytes_added_results = map(attrgetter('result'), results)
else:
bytes_added_results = map(process_contrib_bytes_added, zip(contribs,
itertools.repeat(start),
itertools.repeat(end),
itertools.repeat(self.ns),
itertools.repeat(user_name),
itertools.repeat(cache)))
#bytes_added = {}
for b, contrib in zip(bytes_added_results, contribs):
#bytes_added[contrib['revid']] = b
contrib['bytes_added'] = b
return sum(bytes_added_results)
开发者ID:embr,项目名称:userstats,代码行数:31,代码来源:survival.py
示例17: setup_backend
def setup_backend(self):
from pymongo import ASCENDING, DESCENDING
from pymongo.connection import Connection, _parse_uri
from pymongo.errors import AutoReconnect
_connection = None
uri = self.options.pop('uri', u'')
_connection_attempts = 0
hosts, database, user, password = _parse_uri(uri, Connection.PORT)
# Handle auto reconnect signals properly
while _connection_attempts < 5:
try:
if _connection is None:
_connection = Connection(uri)
database = _connection[database]
break
except AutoReconnect:
_connection_attempts += 1
time.sleep(0.1)
self.database = database
# setup correct indexes
database.tickets.ensure_index([('record_hash', ASCENDING)], unique=True)
database.tickets.ensure_index([('solved', ASCENDING), ('level', ASCENDING)])
database.occurrences.ensure_index([('time', DESCENDING)])
开发者ID:RonnyPfannschmidt,项目名称:logbook,代码行数:28,代码来源:ticketing.py
示例18: crawl
def crawl(self):
_starttime = time.time()
if self.restrict == None:
self.restrict = "http://%s.*" % self.init_domain
print "Deadlink-crawler version 1.1"
print "Starting crawl from URL %s at %s with restriction %s\n" % (self.init_url, strftime("%Y-%m-%d %H:%M:%S", gmtime()), "http://%s.*" % self.init_domain)
while len(self.frontier) > 0:
time.sleep(self.wait_time)
next_time, next_url = self.frontier.next()
while time.time() < next_time:
time.sleep(0.5)
try:
self.visit_url(next_url[0], next_url[1])
except urllib2.URLError:
continue
self.print_deadlinks(self.deadlinks)
_elapsed = time.time() - _starttime
print "\nSummary:\n--------"
print "Crawled %d pages and checked %d links in %s time." % (self._pages, self._links, strftime("%H:%M:%S", gmtime(_elapsed)))
print "Found a total of %d deadlinks in %d different pages" % (self._dead, self._via)
if len(self.deadlinks) == 0:
exit(0)
else:
exit(2)
开发者ID:janhoy,项目名称:deadlink-crawler,代码行数:33,代码来源:crawler.py
示例19: stop
def stop(self):
if self._cur_trace:
self.end_trace()
if self._proxy_proc:
self._stop_signaled = True
os.kill(self._proxy_proc.pid,signal.SIGUSR1)
time.sleep(0.5)
开发者ID:zncb,项目名称:wtrace,代码行数:7,代码来源:workers.py
示例20: main
def main(self, name):
if name != "__main__":
return
import optparse
parser = optparse.OptionParser()
options, args = parser.parse_args()
if not 0 < len(args) < 3:
parser.error("Must supply exactly one event type and"
" optionally one JSON data structure.")
event_type = args[0]
if len(args) > 1:
json_data = args[1]
try:
data = json.loads(json_data)
except Exception:
parser.error("Could not parse JSON")
else:
data = {}
self.record(event_type, **data)
#print "Recorded event: %s (%s)" % (event_type, data)
# This needs to be removed but for now is necessary to make sure
# everything gets flushed
import time
time.sleep(1)
开发者ID:lgastako,项目名称:helios,代码行数:26,代码来源:abstractclient.py
注:本文中的time.time.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论