本文整理汇总了Python中time.ctime函数的典型用法代码示例。如果您正苦于以下问题:Python ctime函数的具体用法?Python ctime怎么用?Python ctime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ctime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main():
time1 = time.time()
print time.ctime()
get_column_urls()
# use 5 threads to get article_urls
for i in xrange(WORKER_THREAD_NUM):
producer_thread = threading.Thread(target=get_article_urls)
producer_thread.setDaemon(True)
producer_thread.start()
producer_threads.append(producer_thread)
for producer_thread in producer_threads:
producer_thread.join()
# after getting all article_urls , start 'worker'
for i in xrange(WORKER_THREAD_NUM):
thread = threading.Thread(target=worker)
thread.setDaemon(True) # need this??
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
time2 = time.time()
print 'finised in %s seconds' % (time2-time1)
开发者ID:tuner24,项目名称:practice,代码行数:26,代码来源:BP11.py
示例2: func
def func(args):
# Get modification times
fromdate = time.ctime(os.stat(args.FILE1).st_mtime)
todate = time.ctime(os.stat(args.FILE2).st_mtime)
# Open fromfile
try:
with open(args.FILE1, 'U') as fd:
fromlines = fd.readlines()
except IOError:
print("Error opening file " + args.FILE1, file=sys.stderr)
# Open tofile
try:
with open(args.FILE2, 'U') as fd:
tolines = fd.readlines()
except IOError:
print("Error opening file " + args.FILE2, file=sys.stderr)
# Create diff
if args.unified:
diff = difflib.unified_diff(fromlines, tolines, args.FILE1, args.FILE2,
fromdate, todate, n=args.lines)
elif args.ndiff:
diff = difflib.ndiff(fromlines, tolines)
elif args.html:
diff = difflib.HtmlDiff().make_file(fromlines, tolines, args.FILE1,
args.FILE2, context=args.context,
numlines=args.lines)
else:
diff = difflib.context_diff(fromlines, tolines, args.FILE1, args.FILE2,
fromdate, todate, n=args.lines)
# we're using writelines because diff is a generator
sys.stdout.writelines(diff)
开发者ID:jaraco,项目名称:pycoreutils,代码行数:35,代码来源:diff.py
示例3: get
def get(self):
settings = get_server_settings()
secret = self.request.headers.get("X-Nuntiuz-Secret", None)
if secret != settings.jabberSecret:
logging.error("Received unauthenticated apple certificate request, ignoring ...")
return
app_id = self.request.get("id")
if not app_id:
return
app = get_app_by_id(app_id)
if not app:
return
if app.apple_push_cert_valid_until < now() + 30 * DAY:
send_mail(settings.dashboardEmail,
settings.supportWorkers,
"The APN cert of %s is about to expire" % app_id,
"The APN cert of %s is valid until %s GMT" % (app_id, time.ctime(app.apple_push_cert_valid_until)))
if app.apple_push_cert_valid_until < now() + 15 * DAY:
logging.error("The APN cert of %s is valid until %s GMT" % (app_id, time.ctime(app.apple_push_cert_valid_until)))
result = json.dumps(dict(cert=app.apple_push_cert, key=app.apple_push_key, valid_until=app.apple_push_cert_valid_until))
self.response.headers['Content-Type'] = 'application/binary'
_, data = encrypt_for_jabber_cloud(secret, result)
self.response.write(data)
开发者ID:rogerthat-platform,项目名称:rogerthat-backend,代码行数:25,代码来源:admin.py
示例4: test2
def test2():
now = time.strftime("%Y-%m-%d")
print(now)
print(type(now))
now_time = datetime.datetime.now()
print(now_time)
print(type(now_time))
today = time.strftime("_%Y_%m_%d")
print(today)
aa = datetime.datetime(2016, 8, 7)
# 看输入的日期是一个星期的第几天
print(aa.weekday())
print(str(int(time.time() * 1000)))
t1 = time.time()
print(t1)
print(time.ctime())
# t = 1490060308998 * 1.00 / 1000
t = 1503048528 * 1.00
print(time.ctime(t1))
print(time.ctime(t))
t3 = time.ctime(t)
print(time.gmtime(t))
# print(time.strptime(t3,'%Y-%m-%d'))
print(type(t3))
开发者ID:Rockyzsu,项目名称:base_function,代码行数:29,代码来源:time_function.py
示例5: refresh_data
def refresh_data():
global metrics
global targets_all
global graphs_all
logger.debug('refresh_data() start')
try:
stat_metrics = backend.stat_metrics()
if last_update is not None and stat_metrics.st_mtime < last_update:
nice_metrics_mtime = time.ctime(stat_metrics.st_mtime)
nice_last_update = time.ctime(last_update)
logger.debug('refresh_data() not needed. metrics.json mtime %s, last_update %s', nice_metrics_mtime, nice_last_update)
return ("metrics.json is last updated %s, "
"rebuilding data structures wouldn't make sense cause they were "
"last rebuilt %s" % (nice_metrics_mtime, nice_last_update))
(metrics, targets_all, graphs_all) = backend.update_data(s_metrics)
if 'metrics_file' in errors:
del errors['metrics_file']
logger.debug('refresh_data() end ok')
return 'ok'
except MetricsError, e:
errors['metrics_file'] = (e.msg, e.underlying_error)
logger.error("[%s] %s", e.msg, e.underlying_error)
logger.error('refresh_data() failed')
response.status = 500
return "errors: %s" % ' '.join('[%s] %s' % (k, v) for (k, v) in errors.items())
开发者ID:sseshachala,项目名称:graph-explorer,代码行数:25,代码来源:app.py
示例6: Socket_TestNext
def Socket_TestNext(ippool):
global Flag, Wait_for_SSL, Succ, InProcess, isEnd
Lock.acquire()
InProcess.append(1)
Lock.release()
while not isEnd:
try:
Lock.acquire()
ip = ippool.pop(random.randrange(len(ippool)))
Lock.release()
if len(ippool)%1000 == 0:
print len(Succ),"/",len(ippool), "/",len(Wait_for_SSL),"/",len(InProcess)
print time.ctime(time.time())
#print time.time()
Soc = Socket_Test.Socket_Test(ip)
if Soc[0] == True:
Lock.acquire()
Wait_for_SSL.append(Soc)
Lock.release()
gevent.sleep()
except KeyboardInterrupt:
isEnd = True
return 0
except (ValueError , IndexError):
Lock.release()
Flag = True
Lock.acquire()
InProcess.pop()
Lock.release()
return 0
Lock.acquire()
InProcess.pop()
Lock.release()
return 0
开发者ID:yinjun322,项目名称:gogotester_python,代码行数:34,代码来源:Main_Gevent.py
示例7: time_diff
def time_diff(self, session, clock="sys"):
"""
Calculate system/hardware time difference between host and guest;
:param session: ShellSession object;
:param clock: 'hw'or 'sys'
:return: time difference
:rtype: float
"""
time_command = self.params.get("%s_time_command" % clock)
time_filter_re = self.params.get("%s_time_filter_re" % clock)
time_format = self.params.get("%s_time_format" % clock)
host_time, guest_time = self.get_time(session,
time_command,
time_filter_re,
time_format)
guest_ctime = time.ctime(guest_time)
host_ctime = time.ctime(host_time)
debug_msg = "Host %s Time: %s = %s epoch seconds " % (clock,
host_ctime,
host_time)
logging.info(debug_msg)
debug_msg = "Guest %s Time: %s = %s epoch seconds" % (clock,
guest_ctime,
guest_time)
logging.info(debug_msg)
return abs(host_time - guest_time)
开发者ID:mazhang,项目名称:virt-test,代码行数:27,代码来源:timedrift_adjust_time.py
示例8: up
def up(cursor):
to_build = _need_collxml(cursor)
num_todo = len(to_build)
batch_size = 100
logger.info('collection.xml to generate: {}'.format(num_todo))
logger.info('Batch size: {}'.format(batch_size))
start = time.time()
guesstimate = 0.01 * num_todo
guess_complete = guesstimate + start
logger.info('Completion guess: '
'"{}" ({})'.format(time.ctime(guess_complete),
timedelta(0, guesstimate)))
num_complete = 0
for batch in _batcher(to_build, batch_size):
coll_idents = tuple([i[0] for i in batch])
logger.debug('coll_idents {}'.format(coll_idents))
for coll_ident in coll_idents:
_build_collxml(coll_ident, cursor)
cursor.connection.commit()
num_complete += len(batch)
percent_comp = num_complete * 100.0 / num_todo
elapsed = time.time() - start
remaining_est = elapsed * (num_todo - num_complete) / num_complete
est_complete = start + elapsed + remaining_est
logger.info('{:.1f}% complete '
'est: "{}" ({})'.format(percent_comp,
time.ctime(est_complete),
timedelta(0, remaining_est)))
logger.info('Total runtime: {}'.format(timedelta(0, elapsed)))
开发者ID:pumazi,项目名称:cnx-db,代码行数:35,代码来源:20181023135904_build-collxml.py
示例9: github_list_pull_requests
def github_list_pull_requests(urls, numbers_only=False):
"""
Returns the pull requests numbers.
It returns a tuple of (nonmergeable, mergeable), where "nonmergeable"
and "mergeable" are lists of the pull requests numbers.
"""
pulls = github_get_pull_request_all(urls)
formated_pulls = []
print "Total pull count", len(pulls)
sys.stdout.write("Processing pulls...")
for pull in pulls:
n = pull["number"]
sys.stdout.write(" %d" % n)
sys.stdout.flush()
pull_info = github_get_pull_request(urls, n)
mergeable = pull_info["mergeable"]
if pull["head"]["repo"]:
repo = pull["head"]["repo"]["html_url"]
else:
repo = None
branch = pull["head"]["ref"]
created_at = pull["created_at"]
created_at = time.strptime(created_at, "%Y-%m-%dT%H:%M:%SZ")
created_at = time.mktime(created_at)
username = pull["head"]["user"]["login"]
user_info = github_get_user_info(urls, username)
author = "\"%s\" <%s>" % (user_info.get("name", "unknown"),
user_info.get("email", ""))
formated_pulls.append((created_at, n, repo, branch, author, mergeable))
formated_pulls.sort(key=lambda x: x[0])
print "\nPatches that cannot be merged without conflicts:"
nonmergeable = []
for created_at, n, repo, branch, author, mergeable in formated_pulls:
if mergeable: continue
nonmergeable.append(int(n))
if numbers_only:
print n,
else:
print "#%03d: %s %s" % (n, repo, branch)
print unicode(" Author : %s" % author).encode('utf8')
print " Date : %s" % time.ctime(created_at)
if numbers_only:
print
print
print "-"*80
print "Patches that can be merged without conflicts:"
mergeable_list = []
for last_change, n, repo, branch, author, mergeable in formated_pulls:
if not mergeable: continue
mergeable_list.append(int(n))
if numbers_only:
print n,
else:
print "#%03d: %s %s" % (n, repo, branch)
print unicode(" Author : %s" % author).encode('utf8')
print " Date : %s" % time.ctime(last_change)
if numbers_only:
print
return nonmergeable, mergeable_list
开发者ID:flacjacket,项目名称:sympy-bot,代码行数:60,代码来源:github.py
示例10: set_up_landmines
def set_up_landmines(target):
"""Does the work of setting, planting, and triggering landmines."""
out_dir = get_target_build_dir(builder(), target, platform() == "ios")
landmines_path = os.path.join(out_dir, ".landmines")
if not os.path.exists(out_dir):
os.makedirs(out_dir)
new_landmines = get_landmines(target)
if not os.path.exists(landmines_path):
with open(landmines_path, "w") as f:
f.writelines(new_landmines)
else:
triggered = os.path.join(out_dir, ".landmines_triggered")
with open(landmines_path, "r") as f:
old_landmines = f.readlines()
if old_landmines != new_landmines:
old_date = time.ctime(os.stat(landmines_path).st_ctime)
diff = difflib.unified_diff(
old_landmines,
new_landmines,
fromfile="old_landmines",
tofile="new_landmines",
fromfiledate=old_date,
tofiledate=time.ctime(),
n=0,
)
with open(triggered, "w") as f:
f.writelines(diff)
elif os.path.exists(triggered):
# Remove false triggered landmines.
os.remove(triggered)
开发者ID:noscripter,项目名称:cefode-chromium,代码行数:34,代码来源:landmines.py
示例11: MPrun
def MPrun(hostname, scriptfile):
'''
Integrate resource allocation information and send it to salt-master
:return: send info -> salt-master
'''
sys.stdout.write("[ %s ] Start to get the data...\n" % time.ctime())
sys.stdout.flush()
global sendinfo
sendinfo = {}
for f in os.listdir(scriptfile):
if os.path.isfile("%s/%s" % (scriptfile, f)):
result = commands.getstatusoutput("/usr/bin/python %s/%s" % (scriptfile, f))
if result[0] == 0 and isinstance(eval(result[1]), dict):
for k in eval(result[1]).keys():
sendinfo[k] = eval(result[1])[k]
else:
sys.stdout.write("[ %s ] ERROR : Please check the script for %s and the script return value must be dictionary type\n" % (time.ctime(), f))
sys.stdout.flush()
time.sleep(0.5)
continue
else:
sys.stdout.write("[ %s ] ERROR : No such file %s\n" % (time.ctime(), f))
sys.stdout.flush()
time.sleep(0.5)
continue
sys.stdout.write("\n[ %s ] system resource configuration information :\n%s\n\n" % (time.ctime(), sendinfo))
sys.stdout.flush()
sendsalt()
return
开发者ID:iTraceur,项目名称:goms,代码行数:32,代码来源:minioninfor.py
示例12: set_up_landmines
def set_up_landmines(target, new_landmines):
"""Does the work of setting, planting, and triggering landmines."""
out_dir = get_target_build_dir(landmine_utils.builder(), target,
landmine_utils.platform() == 'ios')
landmines_path = os.path.join(out_dir, '.landmines')
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if not os.path.exists(landmines_path):
with open(landmines_path, 'w') as f:
f.writelines(new_landmines)
else:
triggered = os.path.join(out_dir, '.landmines_triggered')
with open(landmines_path, 'r') as f:
old_landmines = f.readlines()
if old_landmines != new_landmines:
old_date = time.ctime(os.stat(landmines_path).st_ctime)
diff = difflib.unified_diff(old_landmines, new_landmines,
fromfile='old_landmines', tofile='new_landmines',
fromfiledate=old_date, tofiledate=time.ctime(), n=0)
with open(triggered, 'w') as f:
f.writelines(diff)
elif os.path.exists(triggered):
# Remove false triggered landmines.
os.remove(triggered)
开发者ID:Metrological,项目名称:chromium,代码行数:27,代码来源:landmines.py
示例13: download_list
def download_list(srcdirs = [], srcfiles = [], hostname = 'http://127.0.0.1'):
dstdirs, dstfiles = visitdir(dstmonitorpath, dstwwwroot)
downloadfiles = set(srcfiles) - set(dstfiles)
#rmdirs = set(dstdirs) - set(srcdirs)
rmfiles = set(dstfiles) - set(srcfiles)
logger.warn("rmfiles count=%s", len(rmfiles))
download_count = len(downloadfiles)
logger.warn("download tasks count = %s", download_count)
expires_time = WHOLE_SYNC_TASK_EXPIRES_TIME + 3600
for f, sz in rmfiles:
try:
file = join(dstdir, f)
localfilesz = None
if os.path.exists(file):
localfilesz = getsize(file)
else:
logger.info('file=%s not exist, skip', file)
continue
last_modified_time = os.path.getmtime(file)
time_now = time.time()
if time_now - last_modified_time < expires_time:
logger.info('file: %s now_time(%s) - last_modified_time(%s) = %ss less than whole_sync_task_expires_time(%ss), skip',
file, time.ctime(time_now), time.ctime(last_modified_time), time_now - last_modified_time, expires_time)
continue
is_skip = False
for dlf, dlsz in downloadfiles:
if f == dlf:
logger.info('file:%s is in download list, suggest_size(%s) =? scan_size(%s), real_size:%s, skip',
f, dlsz, sz, localfilesz)
is_skip = True
break
if not is_skip:
logger.info('Going to rm file: %s, filesize=%s', file, localfilesz)
rmfile(None, file)
except Exception as e:
logger.exception('check rm file: %s exception', f)
if download_count > 0:
ThreadPool.initialize()
for (f, filesize) in downloadfiles:
url = join(hostname, f)
ThreadPool.add_task_with_param(download, url)
ThreadPool.wait_for_complete(timeout=3600*10)
logger.warn("complete downloads")
ThreadPool.clear_task()
ThreadPool.stop()
return 'OK'
开发者ID:gjwang,项目名称:openfilesync,代码行数:60,代码来源:tasks.py
示例14: run_brute_force
def run_brute_force(self, keys):
"""You must set the parameter self.logfile before calling this
method. This should probably happen in the __init__ method of
a derived class."""
self.bfparams = copy.copy(self.params)
self.build_param_mat()
self.build_ind_mat()
self.bflabels = self.bfkeys+['e','p1','p2']
self.clear_log()
start_line = 'start time: ' + time.ctime()
print(start_line)
self.str_out(start_line+'\n')
for row in self.bfinds:
self.bfparams = self.set_params(row, self.bfparams)
e, th_bode, a_v_bode = self.cost(self.bfparams, extra=1)
## e, th_bode, a_v_bode = _cost(self.bfparams,
## extra=1, \
## func=self.func, \
## con_dict=self.con_dict)
p1, p2 = self.find_peaks(a_v_bode)
self.save_row(self.bfparams, e, p1, p2)
print('end time: ' + time.ctime())
开发者ID:ryanGT,项目名称:research,代码行数:28,代码来源:JVC_model.py
示例15: get
def get(self):
"""Responds to a single request for performance counter data. Each request has two required
parameters in the query string, 'start' and 'end', which specify the beginning and end of the
time range to be queried. The times should be expressed as the number of seconds since the
unix epoch.
"""
start_time = float(self.get_argument('start'))
end_time = float(self.get_argument('end'))
# Select an appropriate interval resolution based on the requested time span.
selected_interval = metric.LOGS_INTERVALS[-1]
group_key = metric.Metric.EncodeGroupKey(metric.LOGS_STATS_NAME, selected_interval)
logging.info('Query performance counters %s, range: %s - %s, resolution: %s'
% (group_key, time.ctime(start_time), time.ctime(end_time), selected_interval.name))
self.set_header('Content-Type', 'application/json; charset=UTF-8')
metrics = list()
start_key = None
while True:
new_metrics = yield gen.Task(metric.Metric.QueryTimespan, self._client, group_key,
start_time, end_time, excl_start_key=start_key)
if len(new_metrics) > 0:
metrics.extend(new_metrics)
start_key = metrics[-1].GetKey()
else:
break
data = {'group_key': group_key, 'start_time': start_time, 'end_time': end_time}
data['data'] = _SerializeMetrics(metrics)
self.write(json.dumps(data))
self.finish()
开发者ID:00zhengfu00,项目名称:viewfinder,代码行数:32,代码来源:logs_counters.py
示例16: datread
def datread(self):
"""\
Reads in data from .dat or .npy file paths and if .dat is newer than .npy saves
it in .npy format for faster reading in future."""
for vals in self.files:
if vals['type'] == 'dat' and vals['dir'] == 'pull':
# self.datfileloc = vals['dir']
pth = vals['path']
pthnpy = pth.replace('.dat', '.npy', 1)
try:
print("""\
DAT File:\t{0}
DAT: time last modified:\t{2}
NPY File:\t{1}
NPY: time last modified:\t{3}""".format(pth, pthnpy, \
time.ctime(path.getmtime(pth)), \
time.ctime(path.getmtime(pthnpy))))
if path.getmtime(pth) <= path.getmtime(pthnpy):
print('DATFILE: is older than npy... Continue.\n')
else:
print('DATFILE: is newer than npy... Remaking.\n')
os.remove(pthnpy)
except OSError:
pass
if vals['dir'] == 'pull':
self.datcheck = True
self.path = pth
if path.exists(pthnpy):
self.dat = np.load(pthnpy)
else:
self.dat = np.loadtxt(pth)
np.save(pthnpy, self.dat)
开发者ID:slushecl,项目名称:dev,代码行数:32,代码来源:core.py
示例17: timestamp
def timestamp ( ):
#*****************************************************************************80
#
## TIMESTAMP prints the date as a timestamp.
#
# Licensing:
#
# This code is distributed under the GNU LGPL license.
#
# Modified:
#
# 06 April 2013
#
# Author:
#
# John Burkardt
#
# Parameters:
#
# None
#
import time
t = time.time ( )
print time.ctime ( t )
return None
开发者ID:johannesgerer,项目名称:jburkardt-py,代码行数:28,代码来源:fd1d_advection_lax_wendroff.py
示例18: test_run_task_timeout
async def test_run_task_timeout(context):
"""`run_task` raises `ScriptWorkerTaskException` and kills the process
after exceeding `task_max_timeout`.
"""
temp_dir = os.path.join(context.config['work_dir'], "timeout")
context.config['task_script'] = (
sys.executable, TIMEOUT_SCRIPT, temp_dir
)
# With shorter timeouts we hit issues with the script not managing to
# create all 6 files
context.config['task_max_timeout'] = 5
pre = arrow.utcnow().timestamp
with pytest.raises(ScriptWorkerTaskException):
await swtask.run_task(context, noop_to_cancellable_process)
post = arrow.utcnow().timestamp
# I don't love these checks, because timing issues may cause this test
# to be flaky. However, I don't want a non- or long- running test to pass.
# Did this run at all?
assert post - pre >= 5
# Did this run too long? e.g. did it exit on its own rather than killed
# If this is set too low (too close to the timeout), it may not be enough
# time for kill_proc, kill_pid, and the `finally` block to run
assert post - pre < 10
# Did the script generate the expected output?
files = {}
for path in glob.glob(os.path.join(temp_dir, '*')):
files[path] = (time.ctime(os.path.getmtime(path)), os.stat(path).st_size)
print("{} {}".format(path, files[path]))
for path in glob.glob(os.path.join(temp_dir, '*')):
print("Checking {}...".format(path))
assert files[path] == (time.ctime(os.path.getmtime(path)), os.stat(path).st_size)
assert len(list(files.keys())) == 6
# Did we clean up?
assert context.proc is None
开发者ID:escapewindow,项目名称:scriptworker,代码行数:35,代码来源:test_task.py
示例19: checkFilesChanged
def checkFilesChanged(self, files, unchanged_for = 60):
now = time.time()
file_too_new = False
file_time = []
for cur_file in files:
# File got removed while checking
if not os.path.isfile(cur_file):
file_too_new = now
break
# File has changed in last 60 seconds
file_time = self.getFileTimes(cur_file)
for t in file_time:
if t > now - unchanged_for:
file_too_new = tryInt(time.time() - t)
break
if file_too_new:
break
if file_too_new:
try:
time_string = time.ctime(file_time[0])
except:
try:
time_string = time.ctime(file_time[1])
except:
time_string = 'unknown'
return file_too_new, time_string
return False, None
开发者ID:ebewo,项目名称:CouchPotatoServer,代码行数:34,代码来源:base.py
示例20: main
def main():
nfuncs = range(len(funcs))
print '*** SINGLE THREAD'
for i in nfuncs:
print 'starting', funcs[i].__name__, \
'at:', ctime()
print funcs[i](n)
print funcs[i].__name__, 'finished at:', \
ctime()
print '\n*** MULTIPLE THREADS'
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (n,),
funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print threads[i].getResult()
print 'all DONE'
开发者ID:fjrti,项目名称:snippets,代码行数:26,代码来源:mtfacfib3.py
注:本文中的time.ctime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论