本文整理汇总了Python中yappi.get_thread_stats函数的典型用法代码示例。如果您正苦于以下问题:Python get_thread_stats函数的具体用法?Python get_thread_stats怎么用?Python get_thread_stats使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_thread_stats函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: cmd_stop
def cmd_stop(self, args):
"""stop profiling"""
print "Profile results:"
import yappi # We do the import here so that we won't barf if run normally and yappi not available
yappi.get_func_stats().print_all(
columns={0: ("name", 50), 1: ("ncall", 5), 2: ("tsub", 8), 3: ("ttot", 8), 4: ("tavg", 8)}
)
yappi.get_thread_stats().print_all()
yappi.stop()
开发者ID:StudentCopters,项目名称:MAVProxy,代码行数:10,代码来源:mavproxy_profile.py
示例2: yappi_prof_call
def yappi_prof_call(func, *args):
'''
https://code.google.com/p/yappi/wiki/usageyappi_v092
'''
import yappi
yappi.start()
result = func(*args)
yappi.get_func_stats().print_all()
yappi.get_thread_stats().print_all()
return result
开发者ID:aiex,项目名称:requestspool,代码行数:10,代码来源:geventwsgi.py
示例3: main
def main():
print('Main TID: {}'.format(gettid()))
args = Bunch(channel=None,
devices=[],
generate=False,
ui='main.ui')
yappi.start()
exit_value = epyqlib.__main__.main(args=args)
yappi.stop()
yappi.get_func_stats().save('yappi.stats', type='pstat')
yappi.get_thread_stats().print_all()
return exit_value
开发者ID:altendky,项目名称:st,代码行数:12,代码来源:profile_yappi.py
示例4: main
def main(argv):
parser = argparse.ArgumentParser(description='Convert a FASTA and QUAL pair of files into a FASQ file')
parser.add_argument('fasta_filename',
help='the FASTA format input file path')
parser.add_argument('qual_filename',
help='the QUAL format input file path')
parser.add_argument('-q', action='store_true',
help="run quietly")
parser.add_argument('-d', action='store_true',
help="print diagnostics to stderr")
parser.add_argument('-z','--gzip', action='store_true',
help='input & output are gzipped')
parser.add_argument('-p', '--profile', action='store_true',
help="profile this execution")
parser.add_argument('-o', '--output',
default=sys.stdout, type=argparse.FileType('w'),
help='the output FASTQ file path')
args = parser.parse_args()
global debug_flag
debug_flag = args.d
debug_flag and sys.stderr.write("%r\n" % args)
if args.profile:
import yappi # https://code.google.com/p/yappi/wiki/apiyappi
yappi.start(builtins=True)
if args.gzip:
import gzip
fasta = gzip.open(args.fasta_filename)
qual = gzip.open(args.qual_filename)
outf = gzip.GzipFile(fileobj=args.output, mode='wb')
else:
fasta = open(args.fasta_filename)
qual = open(args.qual_filename)
outf = args.output
# alphabet irrelevant (and makes it slow) records = PairedFastaQualIterator(fasta, qual, alphabet=IUPAC.ambiguous_dna)
records = PairedFastaQualIterator(fasta, qual)
count = SeqIO.write(records, outf, "fastq")
outf.close()
if args.profile:
yappi.get_thread_stats().print_all(sys.stderr)
yappi.get_func_stats().sort("subtime").print_all(sys.stderr)
print("Converted %i records" % count)
开发者ID:mattsoulanille,项目名称:Sprock,代码行数:48,代码来源:fandq2fq.py
示例5: test_print_formatting
def test_print_formatting(self):
def a():
pass
def b():
a()
func_cols = {1: ("name", 48), 0: ("ncall", 5), 2: ("tsub", 8)}
thread_cols = {1: ("name", 48), 0: ("ttot", 8)}
yappi.start()
a()
b()
yappi.stop()
fs = yappi.get_func_stats()
cs = fs[1].children
ts = yappi.get_thread_stats()
# fs.print_all(out=sys.stderr, columns={1:("name", 70), })
# cs.print_all(out=sys.stderr, columns=func_cols)
# ts.print_all(out=sys.stderr, columns=thread_cols)
# cs.print_all(out=sys.stderr, columns={})
self.assertRaises(yappi.YappiError, fs.print_all, columns={1: ("namee", 9)})
self.assertRaises(yappi.YappiError, cs.print_all, columns={1: ("dd", 0)})
self.assertRaises(yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)})
开发者ID:smeder,项目名称:GreenletProfiler,代码行数:25,代码来源:test_functionality.py
示例6: test_basic
def test_basic(self):
yappi.set_clock_type('wall')
def dummy():
pass
def a():
time.sleep(0.2)
class Worker1(threading.Thread):
def a(self):
time.sleep(0.3)
def run(self):
self.a()
yappi.start(builtins=False, profile_threads=True)
c = Worker1()
c.start()
c.join()
a()
stats = yappi.get_func_stats()
fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
fsa2 = utils.find_stat_by_name(stats, 'a')
self.assertTrue(fsa1 is not None)
self.assertTrue(fsa2 is not None)
self.assertTrue(fsa1.ttot > 0.2)
self.assertTrue(fsa2.ttot > 0.1)
tstats = yappi.get_thread_stats()
self.assertEqual(len(tstats), 2)
tsa = utils.find_stat_by_name(tstats, 'Worker1')
tsm = utils.find_stat_by_name(tstats, '_MainThread')
dummy() # call dummy to force ctx name to be retrieved again.
self.assertTrue(tsa is not None)
# TODO: I put dummy() to fix below, remove the comments after a while.
self.assertTrue( # FIX: I see this fails sometimes?
tsm is not None,
'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats))))
开发者ID:sumerc,项目名称:yappi,代码行数:34,代码来源:test_functionality.py
示例7: test_callback
def test_callback(self):
self.context_id = 0
yappi.set_context_id_callback(lambda: self.context_id)
yappi.start()
a()
self.context_id = 1
a()
self.context_id = 2
a()
# Re-schedule context 1.
self.context_id = 1
a()
yappi.stop()
threadstats = yappi.get_thread_stats().sort('id', 'ascending')
self.assertEqual(3, len(threadstats))
self.assertEqual(0, threadstats[0].id)
self.assertEqual(1, threadstats[1].id)
self.assertEqual(2, threadstats[2].id)
self.assertEqual(1, threadstats[0].sched_count)
self.assertEqual(2, threadstats[1].sched_count) # Context 1 ran twice.
self.assertEqual(1, threadstats[2].sched_count)
funcstats = yappi.get_func_stats()
self.assertEqual(4, utils.find_stat_by_name(funcstats, 'a').ncall)
开发者ID:sumerc,项目名称:yappi,代码行数:27,代码来源:test_hooks.py
示例8: test_basic
def test_basic(self):
import threading
import time
yappi.set_clock_type('wall')
def a():
time.sleep(0.2)
class Worker1(threading.Thread):
def a(self):
time.sleep(0.3)
def run(self):
self.a()
yappi.start(builtins=False, profile_threads=True)
c = Worker1()
c.start()
c.join()
a()
stats = yappi.get_func_stats()
fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
fsa2 = utils.find_stat_by_name(stats, 'a')
self.assertTrue(fsa1 is not None)
self.assertTrue(fsa2 is not None)
self.assertTrue(fsa1.ttot > 0.2)
self.assertTrue(fsa2.ttot > 0.1)
tstats = yappi.get_thread_stats()
self.assertEqual(len(tstats), 2)
tsa = utils.find_stat_by_name(tstats, 'Worker1')
tsm = utils.find_stat_by_name(tstats, '_MainThread')
self.assertTrue(tsa is not None)
self.assertTrue(tsm is not None) # FIX: I see this fails sometimes?
开发者ID:pombredanne,项目名称:yappi,代码行数:30,代码来源:test_functionality.py
示例9: test_merge_multithreaded_stats
def test_merge_multithreaded_stats(self):
import threading
import _yappi
timings = {"a_1":2, "b_1":1}
_yappi._set_test_timings(timings)
def a(): pass
def b(): pass
yappi.start()
t = threading.Thread(target=a)
t.start()
t.join()
t = threading.Thread(target=b)
t.start()
t.join()
yappi.get_func_stats().save("ystats1.ys")
yappi.clear_stats()
_yappi._set_test_timings(timings)
self.assertEqual(len(yappi.get_func_stats()), 0)
self.assertEqual(len(yappi.get_thread_stats()), 1)
t = threading.Thread(target=a)
t.start()
t.join()
self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
self.assertEqual(_yappi._get_start_flags()["profile_multithread"], 1)
yappi.get_func_stats().save("ystats2.ys")
stats = yappi.YFuncStats(["ystats1.ys", "ystats2.ys",])
fsa = utils.find_stat_by_name(stats, "a")
fsb = utils.find_stat_by_name(stats, "b")
self.assertEqual(fsa.ncall, 2)
self.assertEqual(fsb.ncall, 1)
self.assertEqual(fsa.tsub, fsa.ttot, 4)
self.assertEqual(fsb.tsub, fsb.ttot, 1)
开发者ID:pombredanne,项目名称:yappi,代码行数:34,代码来源:test_functionality.py
示例10: test_pause_resume
def test_pause_resume(self):
yappi.set_context_id_callback(lambda: self.context_id)
yappi.set_clock_type('wall')
# Start in context 0.
self.context_id = 0
yappi.start()
time.sleep(0.08)
# Switch to context 1.
self.context_id = 1
time.sleep(0.05)
# Switch back to context 0.
self.context_id = 0
time.sleep(0.07)
yappi.stop()
t_stats = yappi.get_thread_stats().sort('id', 'ascending')
self.assertEqual(2, len(t_stats))
self.assertEqual(0, t_stats[0].id)
self.assertEqual(2, t_stats[0].sched_count)
self.assertTrue(0.15 < t_stats[0].ttot < 0.3)
self.assertEqual(1, t_stats[1].id)
self.assertEqual(1, t_stats[1].sched_count)
# Context 1 was first scheduled 0.08 sec after context 0.
self.assertTrue(0.1 < t_stats[1].ttot < 0.2 )
开发者ID:sumerc,项目名称:yappi,代码行数:29,代码来源:test_hooks.py
示例11: test_start_flags
def test_start_flags(self):
self.assertEqual(_yappi._get_start_flags(), None)
yappi.start()
def a(): pass
a()
self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
self.assertEqual(_yappi._get_start_flags()["profile_multithread"], 1)
self.assertEqual(len(yappi.get_thread_stats()), 1)
开发者ID:pombredanne,项目名称:yappi,代码行数:8,代码来源:test_functionality.py
示例12: teardown_profiler
def teardown_profiler(self):
if not self.is_profiling():
return
path = os.environ["DRIVE_YAPPI"]
if not os.path.exists(path):
os.mkdir(path)
report_path = os.path.join(path, self.id() + '-yappi-threads')
with open(report_path, 'w') as fd:
columns = {0: ("name", 80), 1: ("tid", 15), 2: ("ttot", 8), 3: ("scnt", 10)}
yappi.get_thread_stats().print_all(out=fd, columns=columns)
report_path = os.path.join(path, self.id() + '-yappi-fcts')
with open(report_path, 'w') as fd:
columns = {0: ("name", 80), 1: ("ncall", 5), 2: ("tsub", 8), 3: ("ttot", 8), 4: ("tavg", 8)}
stats = yappi.get_func_stats()
stats.strip_dirs()
stats.print_all(out=fd, columns=columns)
log.debug("Profiler Report generated in '%s'", report_path)
开发者ID:RajaDG-OpenSource,项目名称:nuxeo-drive,代码行数:17,代码来源:common_unit_test.py
示例13: test_subsequent_profile
def test_subsequent_profile(self):
import threading
WORKER_COUNT = 5
def a(): pass
def b(): pass
def c(): pass
_timings = {"a_1":3,"b_1":2,"c_1":1,}
yappi.start()
def g(): pass
g()
yappi.stop()
yappi.clear_stats()
_yappi._set_test_timings(_timings)
yappi.start()
_dummy = []
for i in range(WORKER_COUNT):
t = threading.Thread(target=a)
t.start()
t.join()
for i in range(WORKER_COUNT):
t = threading.Thread(target=b)
t.start()
_dummy.append(t)
t.join()
for i in range(WORKER_COUNT):
t = threading.Thread(target=a)
t.start()
t.join()
for i in range(WORKER_COUNT):
t = threading.Thread(target=c)
t.start()
t.join()
yappi.stop()
yappi.start()
def f():
pass
f()
stats = yappi.get_func_stats()
fsa = utils.find_stat_by_name(stats, 'a')
fsb = utils.find_stat_by_name(stats, 'b')
fsc = utils.find_stat_by_name(stats, 'c')
self.assertEqual(fsa.ncall, 10)
self.assertEqual(fsb.ncall, 5)
self.assertEqual(fsc.ncall, 5)
self.assertEqual(fsa.ttot, fsa.tsub, 30)
self.assertEqual(fsb.ttot, fsb.tsub, 10)
self.assertEqual(fsc.ttot, fsc.tsub, 5)
# MACOSx optimizes by only creating one worker thread
self.assertTrue(len(yappi.get_thread_stats()) >= 2)
开发者ID:pombredanne,项目名称:yappi,代码行数:53,代码来源:test_functionality.py
示例14: sys_exit_call
def sys_exit_call(signum, frame):
print "profiling result:"
yappi.get_func_stats().print_all(columns={0:("name",100), 1:("ncall", 15),
2:("tsub", 8), 3:("ttot", 8), 4:("tavg",8)})
yappi.get_thread_stats().print_all()
sys.exit(1)
开发者ID:jimhorng,项目名称:python-test,代码行数:6,代码来源:profiling.py
示例15: main
if __name__ == "__main__":
if DEBUG:
sys.argv.append("-h")
if TESTRUN:
import doctest
doctest.testmod()
if PROFILE:
import pstats
#Use yappi in multithreaded context
import yappi
yappi.start()
exit_code = main()
yappi.stop()
stats = yappi.get_func_stats()
tstats = yappi.get_thread_stats()
STAT_FILE = 'profile_stats{}'
stats.save(STAT_FILE.format('.pstat'), type='pstat')
with open(STAT_FILE.format('.txt'), 'w') as fh :
# pstat format better than yappi format
ps = pstats.Stats(STAT_FILE.format('.pstat'), stream = fh)
ps.sort_stats('time')
ps.print_stats()
ps.sort_stats('cumulative')
ps.print_stats()
#append thread stats
tstats.print_all(out = fh)
#add function stat with yappi format
stats.print_all(out=fh)
开发者ID:OpHaCo,项目名称:smart_coffee_machine,代码行数:29,代码来源:detect_smile.py
示例16: bar
import yappi
#yappi.set_clock_type("waLL")
#yappi.start(builtins=True)
yappi.start()
class A:
def bar(self):
pass
def foo():
def inner_foo():
pass
import time
time.sleep(2.0)
for i in range(20000000):
pass
a = A()
a.bar()
inner_foo()
foo()
#yappi.write_callgrind_stats()
yappi.get_func_stats().sort("totaLTiMe").print_all()
yappi.get_thread_stats().print_all()
#import cProfile
#cProfile.run('foo()', 'fooprof')
#import pstats
#p = pstats.Stats('fooprof')
#p.strip_dirs().sort_stats(-1).print_stats()
开发者ID:socialpercon,项目名称:yappi,代码行数:27,代码来源:simple.py
示例17: test_ctx_stats
def test_ctx_stats(self):
from threading import Thread
DUMMY_WORKER_COUNT = 5
yappi.start()
class DummyThread(Thread):
pass
def dummy():
pass
def dummy_worker():
pass
for i in range(DUMMY_WORKER_COUNT):
t = DummyThread(target=dummy_worker)
t.start()
t.join()
yappi.stop()
stats = yappi.get_thread_stats()
tsa = utils.find_stat_by_name(stats, "DummyThread")
self.assertTrue(tsa is not None)
yappi.clear_stats()
time.sleep(1.0)
_timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1, "a_2": 4, "b_2": 3, "c_2": 2, "d_2": 1}
_yappi._set_test_timings(_timings)
class Thread1(Thread):
pass
class Thread2(Thread):
pass
def a():
b()
def b():
c()
def c():
d()
def d():
time.sleep(0.6)
yappi.set_clock_type("wall")
yappi.start()
t1 = Thread1(target=a)
t1.start()
t2 = Thread2(target=a)
t2.start()
t1.join()
t2.join()
stats = yappi.get_thread_stats()
# the fist clear_stats clears the context table?
tsa = utils.find_stat_by_name(stats, "DummyThread")
self.assertTrue(tsa is None)
tst1 = utils.find_stat_by_name(stats, "Thread1")
tst2 = utils.find_stat_by_name(stats, "Thread2")
tsmain = utils.find_stat_by_name(stats, "_MainThread")
dummy() # call dummy to force ctx name to be retrieved again.
self.assertTrue(len(stats) == 3)
self.assertTrue(tst1 is not None)
self.assertTrue(tst2 is not None)
# TODO: I put dummy() to fix below, remove the comments after a while.
self.assertTrue( # FIX: I see this fails sometimes
tsmain is not None, 'Could not find "_MainThread". Found: %s' % (", ".join(utils.get_stat_names(stats)))
)
self.assertTrue(1.0 > tst2.ttot >= 0.5)
self.assertTrue(1.0 > tst1.ttot >= 0.5)
# test sorting of the ctx stats
stats = stats.sort("totaltime", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.ttot >= stat.ttot)
prev_stat = stat
stats = stats.sort("totaltime", "asc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.ttot <= stat.ttot)
prev_stat = stat
stats = stats.sort("schedcount", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.sched_count >= stat.sched_count)
prev_stat = stat
stats = stats.sort("name", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.name >= stat.name)
prev_stat = stat
self.assertRaises(yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg")
#.........这里部分代码省略.........
开发者ID:smeder,项目名称:GreenletProfiler,代码行数:101,代码来源:test_functionality.py
示例18: output_error
output_error("Specify scene file", output_function)
exit(1)
else:
scene_desc = pickle.load(sys.stdin)
pstd = PSTD(args.multi_threaded, args.write_plot, args.write_array, scene_desc, output_function)
out = sys.__stdout__
if args.file:
print("Writing to %s"%args.file)
out = open(args.file,'w')
out.write("openPSTD --- %s\n"%datetime.now().strftime('%d-%m-%y %H:%M'))
out.write("Profile output\n"
"Scene file %s\n"
"Parallel: %s\n"
"Plotting: %s\n"
"Writing arrays: %s\n\n"%(args.scene_file,args.multi_threaded,args.write_plot,args.write_array))
time_start = time.time()
yappi.start()
pstd.run()
yappi.stop()
time_passed = time.time() - time_start
out.write("Time passed: %.3f\n"%time_passed)
funcstats = yappi.get_func_stats()
funcstats.sort(sort_type='tsub', sort_order="desc")
funcstats.print_all(out=out,columns={0:("name",36), 1:("ncall", 8), 2:("tsub", 8), 3:("ttot", 8), 4:("tavg",8)})
yappi.get_thread_stats().print_all(out=out)
if args.file:
out.close()
exit()
# yappi.start()
# foo()
开发者ID:MHAcoustics,项目名称:openPSTD,代码行数:30,代码来源:profile_simulation.py
示例19: lineReceived
def lineReceived(self, line):
anon_tunnel = self.anon_tunnel
profile = self.profile
if line == 'threads':
for thread in threading.enumerate():
print "%s \t %d" % (thread.name, thread.ident)
elif line == 'p':
if profile:
for func_stats in yappi.get_func_stats().sort("subtime")[:50]:
print "YAPPI: %10dx %10.3fs" % (func_stats.ncall, func_stats.tsub), func_stats.name
else:
logger.error("Profiling disabled!")
elif line == 'P':
if profile:
filename = 'callgrindc_%d.yappi' % anon_tunnel.dispersy.lan_address[1]
yappi.get_func_stats().save(filename, type='callgrind')
else:
logger.error("Profiling disabled!")
elif line == 't':
if profile:
yappi.get_thread_stats().sort("totaltime").print_all()
else:
logger.error("Profiling disabled!")
elif line == 'c':
print "========\nCircuits\n========\nid\taddress\t\t\t\t\tgoal\thops\tIN (MB)\tOUT (MB)\tinfohash\ttype"
for circuit_id, circuit in anon_tunnel.community.circuits.items():
info_hash = circuit.info_hash.encode('hex')[:10] if circuit.info_hash else '?'
print "%d\t%s:%d\t%d\t%d\t\t%.2f\t\t%.2f\t\t%s\t%s" % (circuit_id,
circuit.first_hop[0],
circuit.first_hop[1],
circuit.goal_hops,
len(circuit.hops),
circuit.bytes_down / 1024.0 / 1024.0,
circuit.bytes_up / 1024.0 / 1024.0,
info_hash,
circuit.ctype)
elif line.startswith('s'):
cur_path = os.getcwd()
line_split = line.split(' ')
filename = 'test_file' if len(line_split) == 1 else line_split[1]
if not os.path.exists(filename):
logger.info("Creating torrent..")
with open(filename, 'wb') as fp:
fp.write(os.urandom(50 * 1024 * 1024))
tdef = TorrentDef()
tdef.add_content(os.path.join(cur_path, filename))
tdef.set_tracker("udp://fake.net/announce")
tdef.set_private()
tdef.finalize()
tdef.save(os.path.join(cur_path, filename + '.torrent'))
else:
logger.info("Loading existing torrent..")
tdef = TorrentDef.load(filename + '.torrent')
logger.info("loading torrent done, infohash of torrent: %s" % (tdef.get_infohash().encode('hex')[:10]))
defaultDLConfig = DefaultDownloadStartupConfig.getInstance()
dscfg = defaultDLConfig.copy()
dscfg.set_hops(1)
dscfg.set_dest_dir(cur_path)
anon_tunnel.session.lm.threadpool.call(0, anon_tunnel.session.start_download, tdef, dscfg)
elif line.startswith('i'):
# Introduce dispersy port from other main peer to this peer
line_split = line.split(' ')
to_introduce_ip = line_split[1]
to_introduce_port = int(line_split[2])
self.anon_tunnel.community.add_discovered_candidate(Candidate((to_introduce_ip, to_introduce_port), tunnel=False))
elif line.startswith('d'):
line_split = line.split(' ')
filename = 'test_file' if len(line_split) == 1 else line_split[1]
logger.info("Loading torrent..")
tdef = TorrentDef.load(filename + '.torrent')
logger.info("Loading torrent done")
defaultDLConfig = DefaultDownloadStartupConfig.getInstance()
dscfg = defaultDLConfig.copy()
dscfg.set_hops(1)
dscfg.set_dest_dir(os.path.join(os.getcwd(), 'downloader%s' % anon_tunnel.session.get_dispersy_port()))
def start_download():
def cb(ds):
logger.info('Download infohash=%s, down=%s, progress=%s, status=%s, seedpeers=%s, candidates=%d' %
(tdef.get_infohash().encode('hex')[:10],
ds.get_current_speed('down'),
ds.get_progress(),
dlstatus_strings[ds.get_status()],
sum(ds.get_num_seeds_peers()),
sum(1 for _ in anon_tunnel.community.dispersy_yield_verified_candidates())))
return 1.0, False
download = anon_tunnel.session.start_download(tdef, dscfg)
download.set_state_callback(cb, delay=1)
#.........这里部分代码省略.........
开发者ID:Antiade,项目名称:tribler,代码行数:101,代码来源:main.py
示例20: test_ctx_stats
def test_ctx_stats(self):
from threading import Thread
DUMMY_WORKER_COUNT = 5
yappi.start()
class DummyThread(Thread): pass
def dummy_worker():
pass
for i in range(DUMMY_WORKER_COUNT):
t = DummyThread(target=dummy_worker)
t.start()
t.join()
yappi.stop()
stats = yappi.get_thread_stats()
tsa = utils.find_stat_by_name(stats, "DummyThread")
self.assertTrue(tsa is not None)
yappi.clear_stats()
import time
time.sleep(1.0)
_timings = {"a_1":6,"b_1":5,"c_1":3, "d_1":1, "a_2":4,"b_2":3,"c_2":2, "d_2":1}
_yappi._set_test_timings(_timings)
class Thread1(Thread): pass
class Thread2(Thread): pass
def a():
b()
def b():
c()
def c():
d()
def d():
time.sleep(0.6)
yappi.set_clock_type("wall")
yappi.start()
t1 = Thread1(target=a)
t1.start()
t2 = Thread2(target=a)
t2.start()
t1.join()
t2.join()
stats = yappi.get_thread_stats()
# the fist clear_stats clears the context table?
tsa = utils.find_stat_by_name(stats, "DummyThread")
self.assertTrue(tsa is None)
tst1 = utils.find_stat_by_name(stats, "Thread1")
tst2 = utils.find_stat_by_name(stats, "Thread2")
tsmain = utils.find_stat_by_name(stats, "_MainThread")
#stats.print_all()
self.assertTrue(len(stats) == 3)
self.assertTrue(tst1 is not None)
self.assertTrue(tst2 is not None)
self.assertTrue(tsmain is not None) # I see this fails sometimes, probably
# because Py_ImportNoBlock() fails to import and get the thread class name
# sometimes.
self.assertTrue(1.0 > tst2.ttot >= 0.5)
self.assertTrue(1.0 > tst1.ttot >= 0.5)
# test sorting of the ctx stats
stats = stats.sort("totaltime", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.ttot >= stat.ttot)
prev_stat = stat
stats = stats.sort("totaltime", "asc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.ttot <= stat.ttot)
prev_stat = stat
stats = stats.sort("schedcount", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.sched_count >= stat.sched_count)
prev_stat = stat
stats = stats.sort("name", "desc")
prev_stat = None
for stat in stats:
if prev_stat:
self.assertTrue(prev_stat.name >= stat.name)
prev_stat = stat
self.assertRaises(yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg")
self.assertRaises(yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg")
开发者ID:pombredanne,项目名称:yappi,代码行数:85,代码来源:test_functionality.py
注:本文中的yappi.get_thread_stats函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论