• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testify.assert_in函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testify.assert_in函数的典型用法代码示例。如果您正苦于以下问题:Python assert_in函数的具体用法?Python assert_in怎么用?Python assert_in使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了assert_in函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_fail_after_a_while

    def test_fail_after_a_while(self, print_exc_mock, print_warning_mock):
        processes = vimap.pool.fork(
            (worker_raise_exc_with_curleys.init_args(init=i) for i in xrange(100)), in_queue_size_factor=2
        )
        processes.imap([-1] * 3000 + list(range(50)))

        # Check yielded output.
        res_to_compare = []
        for inp, out, typ in processes.zip_in_out_typ():
            if typ == "exception":
                res_to_compare.append((inp, serialize_error(out.value), typ))
            else:
                res_to_compare.append((inp, out, typ))
        # All the -1s will produce None output.
        expected_res_to_compare = [(-1, None, "output")] * 3000
        # Once we get to the positive numbers, we start causing 50 of
        # the 100 workers to throw exceptions.
        expected_res_to_compare.extend(
            [(i, serialize_error(ValueError("{0} curley braces!")), "exception") for i in range(50)]
        )
        T.assert_sorted_equal(res_to_compare, expected_res_to_compare)

        # Check out exception logging.
        calls = print_exc_mock.call_args_list
        errors = [serialize_error(call_args[0].value) for call_args, _ in calls]
        T.assert_equal(errors, [serialize_error(ValueError("{0} curley braces!"))] * 50)

        # NOTE: Sometimes, the weakref in the pool is deleted, so 'has_exceptions' is
        # not set, and the pool prints warnings we don't actually care about. Make
        # sure that this is the only warning printed.
        if print_warning_mock.call_args_list:
            T.assert_equal(len(print_warning_mock.call_args_list), 1)
            [warning] = print_warning_mock.call_args_list
            T.assert_in("Pool disposed before input was consumed", warning[0][0])
开发者ID:striglia,项目名称:vimap,代码行数:34,代码来源:exceptions_test.py


示例2: test_deprecated_mapper_final_positional_arg

    def test_deprecated_mapper_final_positional_arg(self):
        def mapper(k, v):
            pass

        def reducer(k, v):
            pass

        def mapper_final():
            pass

        stderr = StringIO()
        with no_handlers_for_logger():
            log_to_stream('mrjob.job', stderr)
            step = MRJob.mr(mapper, reducer, mapper_final)

        # should be allowed to specify mapper_final as a positional arg,
        # but we log a warning
        assert_equal(step, MRJob.mr(mapper=mapper,
                                    reducer=reducer,
                                    mapper_final=mapper_final))
        assert_in('mapper_final should be specified', stderr.getvalue())

        # can't specify mapper_final as a positional and keyword arg
        assert_raises(
            TypeError,
            MRJob.mr, mapper, reducer, mapper_final, mapper_final=mapper_final)
开发者ID:gimlids,项目名称:LTPM,代码行数:26,代码来源:job_test.py


示例3: test_invalid_job_collation

    def test_invalid_job_collation(self):
        jobs = FrozenDict({'test_collision0': ConfigJob(name='test_collision0',
            node='node0',
            schedule=ConfigIntervalScheduler(timedelta=datetime.timedelta(0,
                                                                          20)),
            actions=FrozenDict({'action0_0': ConfigAction(name='action0_0',
                                                          command='test_command0.0',
                                                          requires=(),
                                                          node=None)}),
            queueing=True,
            run_limit=50,
            all_nodes=False,
            cleanup_action=ConfigCleanupAction(command='test_command0.1',
                                               requires=(),
                                               name='cleanup',
                                               node=None),
            enabled=True,
            allow_overlap=False)})

        services = FrozenDict({'test_collision0': ConfigService(name='test_collision0',
                        node='node0',
                        pid_file='/var/run/%(name)s-%(instance_number)s.pid',
                        command='service_command0',
                        monitor_interval=20,
                        restart_interval=None,
                        count=2)})
        fake_config = mock.Mock()
        setattr(fake_config, 'jobs', jobs)
        setattr(fake_config, 'services', services)
        expected_message = "Collision found for identifier 'MASTER.test_collision0'"
        exception = assert_raises(ConfigError, collate_jobs_and_services, {'MASTER': fake_config})
        assert_in(expected_message, str(exception))
开发者ID:strategist922,项目名称:Tron,代码行数:32,代码来源:config_parse_test.py


示例4: test_failing_child_initialized_hook

    def test_failing_child_initialized_hook(self):
        def child_initialized_hook(child_pid):
            raise Exception, "child_initialized hook raises exception"

        # When child_initialized hook fails parent process will
        # exit. To test a failing initilization hook we fork and watch
        # the new child.
        pid = os.fork()
        if not pid:
            event_hooks = {"child_initialized" : child_initialized_hook}
            with testing.no_stderr():
                # This will fail. redirecting stderr to /dev/null will
                # silence the test output.
                self.run_child_function_in_catbox(event_hooks=event_hooks)
        else:
            status = 0
            wait_pid = 0
            try:
                for _ in range(5):
                    (wait_pid, status, _) = os.wait4(pid, os.WNOHANG)
                    if wait_pid == pid:
                        break
                    time.sleep(.1)
            except OSError, e:
                T.assert_in("No child processes", e)
            else:
开发者ID:Pardus-Linux,项目名称:catbox,代码行数:26,代码来源:test_event_hooks.py


示例5: test_discover_test_with_unknown_import_error

 def test_discover_test_with_unknown_import_error(self):
     """Insure that DiscoveryError is raised when a test which raises an unusual exception upon import is discovered."""
     stdout, stderr = cmd_output(
         'python', '-m', 'testify.test_program', self.broken_import_module,
     )
     T.assert_in('DISCOVERY FAILURE', stdout)
     T.assert_in('AttributeError: aaaaa!', stderr)
开发者ID:chriskuehl,项目名称:Testify,代码行数:7,代码来源:discovery_failure_test.py


示例6: test_contains_ancestral

    def test_contains_ancestral(self):
        cd = ChainedDict(**{"the_key": True})
        cd2 = ChainedDict(parent=cd, **{"the_other_key": True})

        T.assert_in("the_key", cd2)
        T.assert_in("the_other_key", cd2)
        T.assert_not_in("the_other_key", cd)
开发者ID:PLOS-Web,项目名称:py_meter_ingest,代码行数:7,代码来源:chained_dict_test.py


示例7: test_exception_in_setup_phase

    def test_exception_in_setup_phase(self):
        """If a class_setup method raises an exception, this exception is
        reported as an error in all of the test methods in the test case. The
        methods are then treated as flakes and re-run.
        """
        # Pull and run the test case, thereby causing class_setup to run.
        test_case = get_test(self.server, 'runner')
        assert_equal(len(test_case['methods']), 3)
        # The last method will be the special 'run' method which signals the
        # entire test case is complete (including class_teardown).
        assert_equal(test_case['methods'][-1], 'run')

        self.run_test('runner')

        # 'classTearDown' is a deprecated synonym for 'class_teardown'. We
        # don't especially care about it, but it's in there.
        #
        # Exceptions during execution of class_setup cause test methods to fail
        # and get requeued as flakes. They aren't reported now because they
        # aren't complete.
        expected_methods = set(['classTearDown', 'run'])
        # self.run_test configures us up to collect results submitted at
        # class_teardown completion time. class_setup_teardown methods report
        # the result of their teardown phase at "class_teardown completion"
        # time. So, when testing the setup phase of class_setup_teardown, we
        # will see an "extra" method.
        #
        # Child classes which exercise class_setup_teardown will set
        # self.class_setup_teardown_method_name so we can add it to
        # expected_methods here.
        if hasattr(self, 'class_setup_teardown_method_name'):
            expected_methods.add(self.class_setup_teardown_method_name)
        seen_methods = self.get_seen_methods(self.test_reporter.test_complete.calls)
        # This produces a clearer diff than simply asserting the sets are
        # equal.
        assert_equal(expected_methods.symmetric_difference(seen_methods), set())

        # Verify the failed test case is re-queued for running.
        assert_equal(self.server.test_queue.empty(), False)
        requeued_test_case = get_test(self.server, 'runner2')
        assert_in(self.dummy_test_case.__name__, requeued_test_case['class_path'])

        # Reset reporter.
        self.test_reporter.test_complete = turtle.Turtle()

        # Run tests again.
        self.run_test('runner2')

        # This time, test methods have been re-run as flakes. Now that these
        # methods are are complete, they should be reported.
        expected_methods = set(['test1', 'test2', 'classTearDown', 'run'])
        if hasattr(self, 'class_setup_teardown_method_name'):
            expected_methods.add(self.class_setup_teardown_method_name)
        seen_methods = self.get_seen_methods(self.test_reporter.test_complete.calls)
        # This produces a clearer diff than simply asserting the sets are
        # equal.
        assert_equal(expected_methods.symmetric_difference(seen_methods), set())

        # Verify no more test cases have been re-queued for running.
        assert_equal(self.server.test_queue.empty(), True)
开发者ID:MiloJiang,项目名称:Testify,代码行数:60,代码来源:test_runner_server_test.py


示例8: test_bad_requires

    def test_bad_requires(self):
        test_config = (
            BASE_CONFIG
            + """
jobs:
    -
        name: "test_job0"
        node: node0
        schedule: "interval 20s"
        actions:
            -
                name: "action0_0"
                command: "test_command0.0"
            -
                name: "action0_1"
                command: "test_command0.1"

    -
        name: "test_job1"
        node: node0
        schedule: "interval 20s"
        actions:
            -
                name: "action1_0"
                command: "test_command1.0"
                requires: action0_0

        """
        )
        expected_message = "jobs.test_job1.action1_0 has a dependency " '"action0_0" that is not in the same job!'
        exception = assert_raises(ConfigError, load_config, test_config)
        assert_in(expected_message, str(exception))
开发者ID:anthonypt87,项目名称:Tron,代码行数:32,代码来源:config_parse_test.py


示例9: test_overlap_node_and_node_pools

 def test_overlap_node_and_node_pools(self):
     tron_config = dict(
         nodes=[dict(name="sameName", hostname="localhost")], node_pools=[dict(name="sameName", nodes=["sameNode"])]
     )
     expected_msg = "Node and NodePool names must be unique sameName"
     exception = assert_raises(ConfigError, valid_config, tron_config)
     assert_in(expected_msg, str(exception))
开发者ID:anthonypt87,项目名称:Tron,代码行数:7,代码来源:config_parse_test.py


示例10: test_list_path_no_path_duplicates

	def test_list_path_no_path_duplicates(self):
		"""Tests that when no path is specified the correct results are returned
		and the repeat key is not cached with the wrong data source.
		"""
		test_path = None

		expected_paths = [{
			'name': 'src.MajorSource%d' % i,
			'type': 'dir'
		} for i in xrange(len(self.data_source.data_sources))]

		with self._mock_ds_method('_request_paths_from_ds') as mock_request_paths:
			mock_request_path_list = [[path] for path in expected_paths]
			mock_request_path_list[-1].append({
				'name': 'src.MajorSource1',
				'type': 'dir'
			})
			mock_request_paths.side_effect = mock_request_path_list

			actual_paths = self.data_source.list_path(test_path)

			T.assert_equal(mock_request_paths.call_count, len(self.data_source.data_sources))

			for ds in self.data_source.data_sources:
				mock_request_paths.assert_any_call(ds, test_path)

		T.assert_equal(expected_paths, actual_paths)

		for expected_path, expected_data_source in zip(expected_paths, self.data_source.data_sources):
			T.assert_in(expected_path['name'], self.data_source.key_mapping_cache)
			T.assert_equal(self.data_source.key_mapping_cache[expected_path['name']], expected_data_source)
开发者ID:DajunKou,项目名称:firefly,代码行数:31,代码来源:aggregating_data_source_test.py


示例11: test_process_queue_duplicate

    def test_process_queue_duplicate(self):
        duplicate_req = copy.deepcopy(self.fake_request)
        duplicate_req['id'] = 11
        with nested(
            mock.patch("%s.pushmanager.core.git.GitQueue.verify_branch_failure" % __name__),
            mock.patch("%s.pushmanager.core.git.GitQueue.verify_branch_successful" % __name__),
            # This will fail, stop logging errors
            mock.patch("%s.pushmanager.core.git.logging.error" % __name__),
            mock.patch(
                "%s.pushmanager.core.git.GitQueue._get_request_with_sha" % __name__,
                return_value={'id': 10, 'state': 'requested'}
            ),
            self.mocked_update_request(self.fake_request, duplicate_req)
        ):
            # GitQueue._get_request_with_sha returning a value means
            # we have a duplicated request. This should trigger a
            # failure
            T.assert_equal(pushmanager.core.git.GitQueue.verify_branch_failure.call_count, 1)
            T.assert_equal(pushmanager.core.git.GitQueue.verify_branch_successful.call_count, 0)

            # Match the error message for duplicate revision. error_msg
            # should be the last item of the first call object's *args list
            # (from mock library).
            T.assert_in(
                "another request with the same revision sha",
                pushmanager.core.git.GitQueue.verify_branch_failure.call_args_list[0][0][1]
            )
开发者ID:Mango-J,项目名称:pushmanager,代码行数:27,代码来源:test_core_git.py


示例12: test_list_path_no_path

	def test_list_path_no_path(self):
		"""Tests the behavior of list_path when asking for the root keys (no
		path specified).
		"""
		test_path = None

		expected_paths = [{
			'name': 'src.MajorSource%d' % i,
			'type': 'dir'
		} for i in xrange(len(self.data_source.data_sources))]

		with self._mock_ds_method('_request_paths_from_ds') as mock_request_paths:
			mock_request_paths.side_effect = [[path] for path in expected_paths]

			actual_paths = self.data_source.list_path(test_path)

			T.assert_equal(mock_request_paths.call_count, len(self.data_source.data_sources))

			for ds in self.data_source.data_sources:
				mock_request_paths.assert_any_call(ds, test_path)

		T.assert_equal(expected_paths, actual_paths)

		for expected_path, expected_data_source in zip(expected_paths, self.data_source.data_sources):
			T.assert_in(expected_path['name'], self.data_source.key_mapping_cache)
			T.assert_equal(self.data_source.key_mapping_cache[expected_path['name']], expected_data_source)
开发者ID:DajunKou,项目名称:firefly,代码行数:26,代码来源:aggregating_data_source_test.py


示例13: test_find_data_source_for_stat_key

	def test_find_data_source_for_stat_key(self):
		"""Tests _find_data_source_for_stat_key when it's provided by one of
		the configured data sources.
		"""

		expected_data_source = {
			'data_server_url': "http://b.com",
			'data_source_hash': util.generate_ds_key("another.data.source"),
			'secret_key': "TEST_SECRET_TWO"
		}

		test_key = 'src.our_key'

		def fake_paths_from_ds(data_source, path):
			if data_source == expected_data_source:
				return [{"name": test_key},]
			else:
				return [{"name": "src.not_our_key"},]

		with mock.patch.object(self.data_source, '_request_paths_from_ds', fake_paths_from_ds):
			actual_ds = self.data_source._find_data_source_for_stat_key(test_key)

		T.assert_equal(expected_data_source, actual_ds)
		T.assert_in(test_key, self.data_source.key_mapping_cache)
		T.assert_equal(expected_data_source, self.data_source.key_mapping_cache[test_key])
开发者ID:DajunKou,项目名称:firefly,代码行数:25,代码来源:aggregating_data_source_test.py


示例14: verify_message_from_child

 def verify_message_from_child(self, expected_message=None):
     expected_message = expected_message or self.default_expected_message_from_child
     actual_message_from_child = self.poll()
     if actual_message_from_child:
         T.assert_in(expected_message, actual_message_from_child)
     else:
         raise ChildDidNotReportBackException
开发者ID:bukzor,项目名称:catbox,代码行数:7,代码来源:testing.py


示例15: assert_checklist_for_tags

    def assert_checklist_for_tags(self, tags, requestid=None):
        num_checks = 0
        checks = []

        # Gather reference checklists from the code
        for tag in tags:
            # While the tag name is 'search-backend', the checklist type
            # is truncated to 'search'.
            if tag == 'search-backend':
                tag = 'search'

            if tag not in checklist_reminders:
                continue

            plain_list = checklist_reminders[tag]
            checks += [(tag, check) for check in plain_list]

            cleanup_tag = '%s-cleanup' % tag
            cleanup_list = checklist_reminders[cleanup_tag]
            checks += [(cleanup_tag, check) for check in cleanup_list]

        num_checks = len(checks)

        reqid = self.make_request_with_tags(tags, requestid)
        checklists = self.get_checklists(reqid)

        T.assert_equal(num_checks, len(checklists))
        for check in checks:
            T.assert_in((reqid, check[0], check[1]), checklists)

        return reqid
开发者ID:eevee,项目名称:pushmanager,代码行数:31,代码来源:test_servlet_newrequest.py


示例16: test_checklist_duplicate

    def test_checklist_duplicate(self):
        with fake_checklist_request():
            # insert fake data from FakeDataMixin
            fake_pushid = 2
            self.insert_pushes()
            self.insert_requests()
            test1_request = self.get_requests_by_user('testuser1')[0]
            test2_request = self.get_requests_by_user('testuser2')[0]
            self.insert_pushcontent(test1_request['id'], fake_pushid)
            self.insert_pushcontent(test2_request['id'], fake_pushid)

            # insert fake checklist data
            checklist_queries = []
            for req in (test1_request, test2_request):
                checklist_queries.append(db.push_checklist.insert({
                    'request': req['id'],
                    'type': 'search',
                    'target': 'prod'
                }))
                checklist_queries.append(db.push_checklist.insert({
                    'request': req['id'],
                    'type': 'search-cleanup',
                    'target': 'post-verify-prod'
                }))
            db.execute_transaction_cb(checklist_queries, on_db_return)

            uri = "/checklist?id=%d" % fake_pushid
            response = self.fetch(uri)
            T.assert_equal(response.error, None)
            T.assert_not_in("No checklist items for this push", response.body)
            T.assert_not_equal(re.search("for testuser\d,testuser\d", response.body), None)
            T.assert_in("Before Certifying - Do In Prod", response.body)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:32,代码来源:test_servlet_checklist.py


示例17: test_messy_error

 def test_messy_error(self):
     counter_string = 'Job JOBID="_001" FAILED_REDUCES="0" COUNTERS="THIS IS NOT ACTUALLY A COUNTER"'
     with no_handlers_for_logger(''):
         stderr = StringIO()
         log_to_stream('mrjob.parse', stderr, level=logging.WARN)
         assert_equal((None, None), parse_hadoop_counters_from_line(counter_string))
         assert_in('Cannot parse Hadoop counter line', stderr.getvalue())
开发者ID:gimlids,项目名称:LTPM,代码行数:7,代码来源:parse_test.py


示例18: test_hoods_checklists

    def test_hoods_checklists(self):
        with fake_checklist_request():
            # insert fake data from FakeDataMixin
            fake_pushid = 2
            self.insert_pushes()
            self.insert_requests()
            req = self.get_requests_by_user('testuser1')[0]
            self.insert_pushcontent(req['id'], fake_pushid)

            # insert fake checklist data
            checklist_queries = []
            checklist_items = (
                {'request': req['id'], 'type': 'hoods', 'target': 'stage'},
                {'request': req['id'], 'type': 'hoods', 'target': 'prod'},
                {'request': req['id'], 'type': 'hoods-cleanup', 'target': 'post-verify-stage'},
            )
            for checklist_item in checklist_items:
                checklist_queries.append(db.push_checklist.insert(checklist_item))

            db.execute_transaction_cb(checklist_queries, on_db_return)

            uri = "/checklist?id=%d" % fake_pushid
            response = self.fetch(uri)
            T.assert_equal(response.error, None)
            T.assert_not_in("No checklist items for this push", response.body)
            T.assert_in("Notify testuser1 to deploy Geoservices to stage", response.body)
            T.assert_in("Notify testuser1 to deploy Geoservices to prod", response.body)
开发者ID:Mango-J,项目名称:pushmanager,代码行数:27,代码来源:test_servlet_checklist.py


示例19: test_bad_requires

    def test_bad_requires(self):
        test_config = BASE_CONFIG + """
jobs:
    -
        name: "test_job0"
        node: node0
        schedule: "interval 20s"
        actions:
            -
                name: "action0_0"
                command: "test_command0.0"
            -
                name: "action0_1"
                command: "test_command0.1"

    -
        name: "test_job1"
        node: node0
        schedule: "interval 20s"
        actions:
            -
                name: "action1_0"
                command: "test_command1.0"
                requires: [action0_0]

        """
        expected_message = ('jobs.MASTER.test_job1.action1_0 has a dependency '
                '"action0_0" that is not in the same job!')
        exception = assert_raises(ConfigError, valid_config_from_yaml, test_config)
        assert_in(expected_message, str(exception))
开发者ID:ContextLogic,项目名称:Tron,代码行数:30,代码来源:config_parse_test.py


示例20: test_create_scratch_uri

    def test_create_scratch_uri(self):
        # "walrus" bucket will be ignored; it doesn't start with "mrjob-"
        self.add_mock_s3_data({'walrus': {}, 'zebra': {}})

        runner = EMRJobRunner(conf_path=False, s3_sync_wait_time=0.01)

        # bucket name should be mrjob- plus 16 random hex digits
        s3_scratch_uri = runner._opts['s3_scratch_uri']
        assert_equal(s3_scratch_uri[:11], 's3://mrjob-')
        assert_equal(s3_scratch_uri[27:], '/tmp/')

        # bucket shouldn't actually exist yet
        scratch_bucket, _ = parse_s3_uri(s3_scratch_uri)
        assert_not_in(scratch_bucket, self.mock_s3_fs.keys())

        # need to do something to ensure that the bucket actually gets
        # created. let's launch a (mock) job flow
        jfid = runner.make_persistent_job_flow()
        assert_in(scratch_bucket, self.mock_s3_fs.keys())
        runner.make_emr_conn().terminate_jobflow(jfid)

        # once our scratch bucket is created, we should re-use it
        runner2 = EMRJobRunner(conf_path=False)
        assert_equal(runner2._opts['s3_scratch_uri'], s3_scratch_uri)
        s3_scratch_uri = runner._opts['s3_scratch_uri']
开发者ID:boursier,项目名称:mrjob,代码行数:25,代码来源:emr_test.py



注:本文中的testify.assert_in函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testify.assert_not_in函数代码示例发布时间:2022-05-27
下一篇:
Python testify.assert_equal函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap