本文整理汇总了Python中nose.case函数的典型用法代码示例。如果您正苦于以下问题:Python case函数的具体用法?Python case怎么用?Python case使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了case函数的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: collect_tasks
def collect_tasks(self, test, tasks_queue, tasks_list, to_teardown, result):
"""
Recursively traverses the test suite tree and either records
Failure results directly, or recurses into self.collect for
test suite members that share common fixtures, or adds task
to the global queue.
:param test: Test or a collection of test (Test suite)
:param tasks_queue: List of tuples (task_addr, args)
:type tasks_queue: list
:param tasks_list: List of task names task_addr + str(args)
:type tasks_list: list
:param to_teardown: List object to be populated with objects to tear down
:type to_teardown: list
:param result:
:type result: TextTestResult
"""
# Dispatch and collect results
# It puts indexes only on queue because tests aren't picklable
self.stream.write("Inspecting test tree for distributable tests...")
for case in self.get_test_batch(test):
self.stream.write(".")
if (isinstance(case, nose.case.Test) and
isinstance(case.test, failure.Failure)):
case(result) # run here to capture the failure
continue
# handle shared fixtures
if isinstance(case, ContextSuite) and case.context is failure.Failure:
case(result) # run here to capture the failure
continue
if isinstance(case, ContextSuite) and has_shared_fixtures(case):
try:
case.setUp()
except (KeyboardInterrupt, SystemExit):
raise
except:
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
if case.factory:
ancestors = case.factory.context.get(case, [])
for ancestor in ancestors[:2]:
if getattr(ancestor, '_multiprocess_shared_', False):
ancestor._multiprocess_can_split_ = True
#ancestor._multiprocess_shared_ = False
self.collect_tasks(case, tasks_queue, tasks_list, to_teardown, result)
continue
# task_addr is the exact string that was put in tasks_list
test_addr = add_task_to_queue(case, tasks_queue, tasks_list)
log.debug("Queued test %s (%s)", len(tasks_list), test_addr)
self.stream.write(" Found %s test cases\n" % len(tasks_queue))
开发者ID:kacarstensen-shift,项目名称:nose_gevent_multiprocess,代码行数:59,代码来源:nose_gevented_multiprocess.py
示例2: test_function_test_case
def test_function_test_case(self):
res = unittest.TestResult()
a = []
def func(a=a):
a.append(1)
case = nose.case.FunctionTestCase(func)
case(res)
assert a[0] == 1
开发者ID:adpayne237,项目名称:zambiamobile,代码行数:10,代码来源:test_cases.py
示例3: test_result_proxy_used
def test_result_proxy_used(self):
"""A result proxy is used to wrap the result for all tests"""
class TC(unittest.TestCase):
def runTest(self):
raise Exception("error")
ResultProxy.called[:] = []
res = unittest.TestResult()
config = Config()
case = nose.case.Test(TC(), config=config,
resultProxy=ResultProxyFactory())
case(res)
assert not res.errors, res.errors
assert not res.failures, res.failures
calls = [ c[0] for c in ResultProxy.called ]
self.assertEqual(calls, ['beforeTest', 'startTest', 'addError',
'stopTest', 'afterTest'])
开发者ID:adpayne237,项目名称:zambiamobile,代码行数:19,代码来源:test_cases.py
示例4: test_function_test_case_fixtures
def test_function_test_case_fixtures(self):
from nose.tools import with_setup
res = unittest.TestResult()
called = {}
def st():
called['st'] = True
def td():
called['td'] = True
def func_exc():
called['func'] = True
raise TypeError("An exception")
func_exc = with_setup(st, td)(func_exc)
case = nose.case.FunctionTestCase(func_exc)
case(res)
assert 'st' in called
assert 'func' in called
assert 'td' in called
开发者ID:adpayne237,项目名称:zambiamobile,代码行数:21,代码来源:test_cases.py
示例5: test_case_fixtures_called
def test_case_fixtures_called(self):
"""Instance fixtures are properly called for wrapped tests"""
res = unittest.TestResult()
called = []
class TC(unittest.TestCase):
def setUp(self):
print "TC setUp %s" % self
called.append('setUp')
def runTest(self):
print "TC runTest %s" % self
called.append('runTest')
def tearDown(self):
print "TC tearDown %s" % self
called.append('tearDown')
case = nose.case.Test(TC())
case(res)
assert not res.errors, res.errors
assert not res.failures, res.failures
self.assertEqual(called, ['setUp', 'runTest', 'tearDown'])
开发者ID:adpayne237,项目名称:zambiamobile,代码行数:21,代码来源:test_cases.py
示例6: collect
def collect(self, test, testQueue, tasks, to_teardown, result):
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
log.debug("Next batch %s (%s)", case, type(case))
if (isinstance(case, nose.case.Test) and
isinstance(case.test, failure.Failure)):
log.debug("Case is a Failure")
case(result) # run here to capture the failure
continue
# handle shared fixtures
if isinstance(case, ContextSuite) and case.context is failure.Failure:
log.debug("Case is a Failure")
case(result) # run here to capture the failure
continue
elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
log.debug("%s has shared fixtures", case)
try:
case.setUp()
except (KeyboardInterrupt, SystemExit):
raise
except:
log.debug("%s setup failed", sys.exc_info())
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
if case.factory:
ancestors=case.factory.context.get(case, [])
for an in ancestors[:2]:
#log.debug('reset ancestor %s', an)
if getattr(an, '_multiprocess_shared_', False):
an._multiprocess_can_split_=True
#an._multiprocess_shared_=False
self.collect(case, testQueue, tasks, to_teardown, result)
else:
test_addr = self.addtask(testQueue,tasks,case)
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
开发者ID:ArekSredzki,项目名称:ubcsailbots,代码行数:39,代码来源:multiprocess.py
示例7: run
def run(self, test):
"""
Execute the test (which may be a test suite). If the test is a suite,
distribute it out among as many processes as have been configured, at
as fine a level as is possible given the context fixtures defined in
the suite or any sub-suites.
"""
log.debug("%s.run(%s) (%s)", self, test, os.getpid())
wrapper = self.config.plugins.prepareTest(test)
if wrapper is not None:
test = wrapper
# plugins can decorate or capture the output stream
wrapped = self.config.plugins.setOutputStream(self.stream)
if wrapped is not None:
self.stream = wrapped
testQueue = Queue()
resultQueue = Queue()
tasks = []
completed = []
workers = []
to_teardown = []
shouldStop = Event()
result = self._makeResult()
start = time.time()
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
log.debug("Next batch %s (%s)", case, type(case))
if isinstance(case, nose.case.Test) and isinstance(case.test, failure.Failure):
log.debug("Case is a Failure")
case(result) # run here to capture the failure
continue
# handle shared fixtures
if isinstance(case, ContextSuite) and case.context is failure.Failure:
log.debug("Case is a Failure")
case(result) # run here to capture the failure
continue
elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
log.debug("%s has shared fixtures", case)
try:
case.setUp()
except (KeyboardInterrupt, SystemExit):
raise
except:
log.debug("%s setup failed", sys.exc_info())
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
for _t in case:
test_addr = self.addtask(testQueue, tasks, _t)
log.debug("Queued shared-fixture test %s (%s) to %s", len(tasks), test_addr, testQueue)
else:
test_addr = self.addtask(testQueue, tasks, case)
log.debug("Queued test %s (%s) to %s", len(tasks), test_addr, testQueue)
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
currentaddr = Array("c", 1000)
currentaddr.value = bytes_("")
currentstart = Value("d")
keyboardCaught = Event()
p = Process(
target=runner,
args=(
i,
testQueue,
resultQueue,
currentaddr,
currentstart,
keyboardCaught,
shouldStop,
self.loaderClass,
result.__class__,
pickle.dumps(self.config),
),
)
p.currentaddr = currentaddr
p.currentstart = currentstart
p.keyboardCaught = keyboardCaught
# p.setDaemon(True)
p.start()
workers.append(p)
log.debug("Started worker process %s", i + 1)
total_tasks = len(tasks)
# need to keep track of the next time to check for timeouts in case
# more than one process times out at the same time.
nexttimeout = self.config.multiprocess_timeout
while tasks:
log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs", len(completed), total_tasks, nexttimeout)
try:
iworker, addr, newtask_addrs, batch_result = resultQueue.get(timeout=nexttimeout)
log.debug("Results received for worker %d, %s, new tasks: %d", iworker, addr, len(newtask_addrs))
try:
try:
#.........这里部分代码省略.........
开发者ID:Praveen-Ramanujam,项目名称:MobRAVE,代码行数:101,代码来源:multiprocess.py
示例8: run
def run(self, test):
"""
Execute the test (which may be a test suite). If the test is a suite,
distribute it out among as many processes as have been configured, at
as fine a level as is possible given the context fixtures defined in the
suite or any sub-suites.
"""
log.debug("%s.run(%s) (%s)", self, test, os.getpid())
wrapper = self.config.plugins.prepareTest(test)
if wrapper is not None:
test = wrapper
# plugins can decorate or capture the output stream
wrapped = self.config.plugins.setOutputStream(self.stream)
if wrapped is not None:
self.stream = wrapped
testQueue = Queue()
resultQueue = Queue()
tasks = {}
completed = {}
workers = []
to_teardown = []
shouldStop = Event()
result = self._makeResult()
start = time.time()
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
log.debug("Next batch %s (%s)", case, type(case))
if (isinstance(case, nose.case.Test) and
isinstance(case.test, failure.Failure)):
log.debug("Case is a Failure")
case(result) # run here to capture the failure
continue
# handle shared fixtures
if isinstance(case, ContextSuite) and self.sharedFixtures(case):
log.debug("%s has shared fixtures", case)
try:
case.setUp()
except (KeyboardInterrupt, SystemExit):
raise
except:
log.debug("%s setup failed", sys.exc_info())
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
for _t in case:
test_addr = self.address(_t)
testQueue.put(test_addr, block=False)
tasks[test_addr] = None
log.debug("Queued shared-fixture test %s (%s) to %s",
len(tasks), test_addr, testQueue)
else:
test_addr = self.address(case)
testQueue.put(test_addr, block=False)
tasks[test_addr] = None
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
p = Process(target=runner, args=(i,
testQueue,
resultQueue,
shouldStop,
self.loaderClass,
result.__class__,
pickle.dumps(self.config)))
# p.setDaemon(True)
p.start()
workers.append(p)
log.debug("Started worker process %s", i+1)
num_tasks = len(tasks)
while tasks:
log.debug("Waiting for results (%s/%s tasks)",
len(completed), num_tasks)
try:
addr, batch_result = resultQueue.get(
timeout=self.config.multiprocess_timeout)
log.debug('Results received for %s', addr)
try:
tasks.pop(addr)
except KeyError:
log.debug("Got result for unknown task? %s", addr)
else:
completed[addr] = batch_result
self.consolidate(result, batch_result)
if (self.config.stopOnError
and not result.wasSuccessful()):
# set the stop condition
shouldStop.set()
break
except Empty:
log.debug("Timed out with %s tasks pending", len(tasks))
#.........这里部分代码省略.........
开发者ID:abierbaum,项目名称:vendor_branch_testing_methodA,代码行数:101,代码来源:multiprocess.py
注:本文中的nose.case函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论