• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python mock.patch函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.mock.patch函数的典型用法代码示例。如果您正苦于以下问题:Python patch函数的具体用法?Python patch怎么用?Python patch使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了patch函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: set_patches

 def set_patches(self):
     super(MDRaidArrayDeviceMethodsTestCase, self).set_patches()
     self.patchers["md"] = patch("blivet.devices.md.blockdev.md")
     self.patchers["is_disk"] = patch.object(self.device_class, "is_disk",
                                             new=PropertyMock(return_value=False))
     self.patchers["pvs_info"] = patch("blivet.devices.md.pvs_info")
     self.patchers["lvm"] = patch("blivet.devices.md.blockdev.lvm")
开发者ID:rhinstaller,项目名称:blivet,代码行数:7,代码来源:device_methods_test.py


示例2: test_teardown

    def test_teardown(self):
        with patch("blivet.devicelibs.lvm.lvmetad_socket_exists", return_value=False):
            super(LVMLogicalVolumeDeviceMethodsTestCase, self).test_teardown()

        with patch("blivet.devices.lvm.blockdev.lvm") as lvm:
            self.device._teardown()
            self.assertTrue(lvm.lvdeactivate.called)
开发者ID:rhinstaller,项目名称:blivet,代码行数:7,代码来源:device_methods_test.py


示例3: set_patches

 def set_patches(self):
     super(FSMethodsTestCase, self).set_patches()
     self.patchers["udev"] = patch("blivet.formats.fs.udev")
     self.patchers["util"] = patch("blivet.formats.fs.util")
     self.patchers["system_mountpoint"] = patch.object(self.format_class,
                                                       "system_mountpoint",
                                                       new=PropertyMock(return_value='/fake/mountpoint'))
     self.patchers["fs_os"] = patch("blivet.formats.fs.os")
开发者ID:rhinstaller,项目名称:blivet,代码行数:8,代码来源:methods_test.py


示例4: setUp

 def setUp(self):
     # Create a cleanup to undo a patch() call *before* calling the
     # base class version of setup().
     patcher = mock.patch('os.environ.keys')
     patcher.start()
     self.addCleanup(patcher.stop)
     super(TestManualMock, self).setUp()
     self.useFixture(fixtures.MockPatch('fixtures.Timeout'))
     self.unstopped = mock.patch('os.environ.put')
开发者ID:openstack,项目名称:oslotest,代码行数:9,代码来源:test_base.py


示例5: test_load_notifiers_no_extensions

    def test_load_notifiers_no_extensions(self):
        self.config(routing_notifier_config="routing_notifier.yaml")
        routing_config = r""
        config_file = mock.MagicMock()
        config_file.return_value = routing_config

        with mock.patch.object(self.router, '_get_notifier_config_file',
                               config_file):
            with mock.patch('stevedore.dispatch.DispatchExtensionManager',
                            return_value=self._empty_extension_manager()):
                with mock.patch('oslo_messaging.notify.'
                                '_impl_routing.LOG') as mylog:
                    self.router._load_notifiers()
                    self.assertFalse(mylog.debug.called)
        self.assertEqual({}, self.router.routing_groups)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:15,代码来源:test_notifier.py


示例6: test_lsm_error_handling

    def test_lsm_error_handling(self):
        """Validate handling of potential lsm errors."""
        with patch("blivet.devicelibs.disk._lsm_required._check_avail", return_value=True):
            with patch("blivet.devicelibs.disk.lsm") as _lsm:
                _lsm.LsmError = FakeLsmError

                # verify that we end up with an empty dict if lsm.Client() raises LsmError
                _lsm.Client.side_effect = raise_lsm_error
                disklib.update_volume_info()
                self.assertEqual(disklib.volumes, dict())

                # verify that any error other than LsmError gets raised
                _lsm.Client.side_effect = raise_other_error
                with self.assertRaises(OtherError):
                    disklib.update_volume_info()
开发者ID:rhinstaller,项目名称:blivet,代码行数:15,代码来源:disk_test.py


示例7: test_ignore_req_opt

    def test_ignore_req_opt(self):
        m = middleware.RequestNotifier(FakeApp(),
                                       ignore_req_list='get, PUT')
        req = webob.Request.blank('/skip/foo',
                                  environ={'REQUEST_METHOD': 'GET'})
        req1 = webob.Request.blank('/skip/foo',
                                   environ={'REQUEST_METHOD': 'PUT'})
        req2 = webob.Request.blank('/accept/foo',
                                   environ={'REQUEST_METHOD': 'POST'})
        with mock.patch(
                'oslo_messaging.notify.notifier.Notifier._notify') as notify:
            # Check GET request does not send notification
            m(req)
            m(req1)
            self.assertEqual(0, len(notify.call_args_list))

            # Check non-GET request does send notification
            m(req2)
            self.assertEqual(2, len(notify.call_args_list))
            call_args = notify.call_args_list[0][0]
            self.assertEqual('http.request', call_args[1])
            self.assertEqual('INFO', call_args[3])
            self.assertEqual(set(['request']),
                             set(call_args[2].keys()))

            request = call_args[2]['request']
            self.assertEqual('/accept/foo', request['PATH_INFO'])
            self.assertEqual('POST', request['REQUEST_METHOD'])

            call_args = notify.call_args_list[1][0]
            self.assertEqual('http.response', call_args[1])
            self.assertEqual('INFO', call_args[3])
            self.assertEqual(set(['request', 'response']),
                             set(call_args[2].keys()))
开发者ID:openstack,项目名称:oslo.messaging,代码行数:34,代码来源:test_middleware.py


示例8: test_add_device

    def test_add_device(self, *args):  # pylint: disable=unused-argument
        dt = DeviceTree()

        dev1 = StorageDevice("dev1", exists=False, uuid=sentinel.dev1_uuid, parents=[])

        self.assertEqual(dt.devices, list())

        # things are called, updated as expected when a device is added
        with patch("blivet.devicetree.callbacks") as callbacks:
            dt._add_device(dev1)
            self.assertTrue(callbacks.device_added.called)

        self.assertEqual(dt.devices, [dev1])
        self.assertTrue(dev1 in dt.devices)
        self.assertTrue(dev1.name in dt.names)
        self.assertTrue(dev1.add_hook.called)  # pylint: disable=no-member

        # adding an already-added device fails
        six.assertRaisesRegex(self, ValueError, "already in tree", dt._add_device, dev1)

        dev2 = StorageDevice("dev2", exists=False, parents=[])
        dev3 = StorageDevice("dev3", exists=False, parents=[dev1, dev2])

        # adding a device with one or more parents not already in the tree fails
        six.assertRaisesRegex(self, DeviceTreeError, "parent.*not in tree", dt._add_device, dev3)
        self.assertFalse(dev2 in dt.devices)
        self.assertFalse(dev2.name in dt.names)

        dt._add_device(dev2)
        self.assertTrue(dev2 in dt.devices)
        self.assertTrue(dev2.name in dt.names)

        dt._add_device(dev3)
        self.assertTrue(dev3 in dt.devices)
        self.assertTrue(dev3.name in dt.names)
开发者ID:rhinstaller,项目名称:blivet,代码行数:35,代码来源:devicetree_test.py


示例9: _test_destroy_backend

 def _test_destroy_backend(self):
     with patch("blivet.formats.run_program") as run_program:
         run_program.return_value = 0
         self.format.exists = True
         self.format.destroy()
         self.assertFalse(self.format.exists)
         run_program.assert_called_with(["wipefs", "-f", "-a", self.format.device])
开发者ID:rhinstaller,项目名称:blivet,代码行数:7,代码来源:methods_test.py


示例10: test_destroy

    def test_destroy(self):
        with patch.object(self.device, "teardown"):
            super(LVMLogicalVolumeDeviceMethodsTestCase, self).test_destroy()

        with patch("blivet.devices.lvm.blockdev.lvm") as lvm:
            self.device._destroy()
            self.assertTrue(lvm.lvremove.called)
开发者ID:rhinstaller,项目名称:blivet,代码行数:7,代码来源:device_methods_test.py


示例11: test_reconnect_order

    def test_reconnect_order(self):
        brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
        brokers_count = len(brokers)

        self.config(qpid_hosts=brokers,
                    group='oslo_messaging_qpid')

        with mock.patch('qpid.messaging.Connection') as conn_mock:
            # starting from the first broker in the list
            url = oslo_messaging.TransportURL.parse(self.conf, None)
            connection = qpid_driver.Connection(self.conf, url,
                                                amqp.PURPOSE_SEND)

            # reconnect will advance to the next broker, one broker per
            # attempt, and then wrap to the start of the list once the end is
            # reached
            for _ in range(brokers_count):
                connection.reconnect()

        expected = []
        for broker in brokers:
            expected.extend([mock.call("%s:5672" % broker),
                             mock.call().open(),
                             mock.call().session(),
                             mock.call().opened(),
                             mock.call().opened().__nonzero__(),
                             mock.call().close()])

        conn_mock.assert_has_calls(expected, any_order=True)
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:29,代码来源:test_impl_qpid.py


示例12: test_listen_and_direct_send

    def test_listen_and_direct_send(self):
        target = oslo_messaging.Target(exchange="exchange_test",
                                       topic="topic_test",
                                       server="server_test")

        with mock.patch('qpid.messaging.Connection') as conn_cls:
            conn = conn_cls.return_value
            session = conn.session.return_value
            session.receiver.side_effect = [mock.Mock(), mock.Mock(),
                                            mock.Mock()]

            listener = self.driver.listen(target)
            listener.conn.direct_send("msg_id", {})

        self.assertEqual(3, len(listener.conn.consumers))

        expected_calls = [
            mock.call(AddressNodeMatcher(
                'amq.topic/topic/exchange_test/topic_test')),
            mock.call(AddressNodeMatcher(
                'amq.topic/topic/exchange_test/topic_test.server_test')),
            mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
        ]
        session.receiver.assert_has_calls(expected_calls)
        session.sender.assert_called_with(
            AddressNodeMatcher("amq.direct/msg_id"))
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:26,代码来源:test_impl_qpid.py


示例13: test_declared_queue_publisher

    def test_declared_queue_publisher(self):
        transport = oslo_messaging.get_transport(self.conf,
                                                 'kombu+memory:////')
        self.addCleanup(transport.cleanup)

        e_passive = kombu.entity.Exchange(
            name='foobar',
            type='topic',
            passive=True)

        e_active = kombu.entity.Exchange(
            name='foobar',
            type='topic',
            passive=False)

        with transport._driver._get_connection(
                driver_common.PURPOSE_SEND) as pool_conn:
            conn = pool_conn.connection
            exc = conn.connection.channel_errors[0]

            def try_send(exchange):
                conn._ensure_publishing(
                    conn._publish_and_creates_default_queue,
                    exchange, {}, routing_key='foobar')

            with mock.patch('kombu.transport.virtual.Channel.close'):
                # Ensure the exchange does not exists
                self.assertRaises(oslo_messaging.MessageDeliveryFailure,
                                  try_send, e_passive)
                # Create it
                try_send(e_active)
                # Ensure it creates it
                try_send(e_passive)

            with mock.patch('kombu.messaging.Producer.publish',
                            side_effect=exc):
                with mock.patch('kombu.transport.virtual.Channel.close'):
                    # Ensure the exchange is already in cache
                    self.assertIn('foobar', conn._declared_exchanges)
                    # Reset connection
                    self.assertRaises(oslo_messaging.MessageDeliveryFailure,
                                      try_send, e_passive)
                    # Ensure the cache is empty
                    self.assertEqual(0, len(conn._declared_exchanges))

            try_send(e_active)
            self.assertIn('foobar', conn._declared_exchanges)
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:47,代码来源:test_impl_rabbit.py


示例14: test_notify

    def test_notify(self):

        with mock.patch("confluent_kafka.Producer") as producer:
            self.driver.pconn.notify_send("fake_topic",
                                          {"fake_ctxt": "fake_param"},
                                          {"fake_text": "fake_message_1"},
                                          10)
            assert producer.call_count == 1
开发者ID:openstack,项目名称:oslo.messaging,代码行数:8,代码来源:test_impl_kafka.py


示例15: testEventMask

    def testEventMask(self):
        handler_cb = Mock()
        with patch("blivet.events.manager.validate_cb", return_value=True):
            mgr = FakeEventManager(handler_cb=handler_cb)

        device = "sdc"
        action = "add"
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 1)
        event = handler_cb.call_args[1]["event"]  # pylint: disable=unsubscriptable-object
        self.assertEqual(event.device, device)
        self.assertEqual(event.action, action)

        # mask matches device but not action -> event is handled
        handler_cb.reset_mock()
        mask = mgr.add_mask(device=device, action=action + 'x')
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 1)
        event = handler_cb.call_args[1]["event"]  # pylint: disable=unsubscriptable-object
        self.assertEqual(event.device, device)
        self.assertEqual(event.action, action)

        # mask matches action but not device -> event is handled
        handler_cb.reset_mock()
        mask = mgr.add_mask(device=device + 'x', action=action)
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 1)
        event = handler_cb.call_args[1]["event"]  # pylint: disable=unsubscriptable-object
        self.assertEqual(event.device, device)
        self.assertEqual(event.action, action)

        # mask matches device and action -> event is ignored
        handler_cb.reset_mock()
        mgr.remove_mask(mask)
        mask = mgr.add_mask(device=device, action=action)
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 0)

        # device-only mask matches -> event is ignored
        handler_cb.reset_mock()
        mgr.remove_mask(mask)
        mask = mgr.add_mask(device=device)
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 0)

        # action-only mask matches -> event is ignored
        handler_cb.reset_mock()
        mgr.remove_mask(mask)
        mask = mgr.add_mask(action=action)
        mgr.handle_event(action, device)
        time.sleep(1)
        self.assertEqual(handler_cb.call_count, 0)
        mgr.remove_mask(mask)
开发者ID:rhinstaller,项目名称:blivet,代码行数:58,代码来源:events_test.py


示例16: test_best_label_type

    def test_best_label_type(self, arch):
        """
            1. is always in _disklabel_types
            2. is the default unless the device is too long for the default
            3. is msdos for fba dasd on S390
            4. is dasd for non-fba dasd on S390
        """
        dl = blivet.formats.disklabel.DiskLabel()
        dl._parted_disk = mock.Mock()
        dl._parted_device = mock.Mock()
        dl._device = "labeltypefakedev"

        arch.is_s390.return_value = False
        arch.is_efi.return_value = False
        arch.is_aarch64.return_value = False
        arch.is_arm.return_value = False
        arch.is_pmac.return_value = False

        with mock.patch.object(dl, '_label_type_size_check') as size_check:
            # size check passes for first type ("msdos")
            size_check.return_value = True
            self.assertEqual(dl._get_best_label_type(), "msdos")

            # size checks all fail -> label type is None
            size_check.return_value = False
            self.assertEqual(dl._get_best_label_type(), None)

            # size check passes on second call -> label type is "gpt" (second in platform list)
            size_check.side_effect = [False, True]
            self.assertEqual(dl._get_best_label_type(), "gpt")

        arch.is_pmac.return_value = True
        with mock.patch.object(dl, '_label_type_size_check') as size_check:
            size_check.return_value = True
            self.assertEqual(dl._get_best_label_type(), "mac")
        arch.is_pmac.return_value = False

        arch.is_efi.return_value = True
        with mock.patch.object(dl, '_label_type_size_check') as size_check:
            size_check.return_value = True
            self.assertEqual(dl._get_best_label_type(), "gpt")
        arch.is_efi.return_value = False

        arch.is_s390.return_value = True
        with mock.patch.object(dl, '_label_type_size_check') as size_check:
            size_check.return_value = True
            with mock.patch("blivet.formats.disklabel.blockdev.s390") as _s390:
                _s390.dasd_is_fba.return_value = False
                self.assertEqual(dl._get_best_label_type(), "msdos")

                _s390.dasd_is_fba.return_value = True
                self.assertEqual(dl._get_best_label_type(), "msdos")

                _s390.dasd_is_fba.return_value = False
                dl._parted_device.type = parted.DEVICE_DASD
                self.assertEqual(dl._get_best_label_type(), "dasd")
        arch.is_s390.return_value = False
开发者ID:rhinstaller,项目名称:blivet,代码行数:57,代码来源:disklabel_test.py


示例17: test_notify

    def test_notify(self):

        with mock.patch("kafka.KafkaProducer") as fake_producer_class:
            fake_producer = fake_producer_class.return_value
            self.driver.pconn.notify_send("fake_topic",
                                          {"fake_ctxt": "fake_param"},
                                          {"fake_text": "fake_message_1"},
                                          10)
            self.assertEqual(2, len(fake_producer.send.mock_calls))
开发者ID:kgiusti,项目名称:oslo.messaging,代码行数:9,代码来源:test_impl_kafka.py


示例18: test_process_response_fail

 def test_process_response_fail(self):
     def notify_error(context, publisher_id, event_type,
                      priority, payload):
         raise Exception('error')
     with mock.patch('oslo_messaging.notify.notifier.Notifier._notify',
                     notify_error):
         m = middleware.RequestNotifier(FakeApp())
         req = webob.Request.blank('/foo/bar',
                                   environ={'REQUEST_METHOD': 'GET'})
         m.process_response(req, webob.response.Response())
开发者ID:openstack,项目名称:oslo.messaging,代码行数:10,代码来源:test_middleware.py


示例19: test_send_notification

    def test_send_notification(self):
        target = oslo_messaging.Target(exchange="exchange_test",
                                       topic="topic_test.info")
        with mock.patch('qpid.messaging.Connection') as conn_cls:
            conn = conn_cls.return_value
            session = conn.session.return_value

            self.driver.send_notification(target, {}, {}, "2.0")
            session.sender.assert_called_with(AddressNodeMatcher(
                "amq.topic/topic/exchange_test/topic_test.info"))
开发者ID:eayunstack,项目名称:oslo.messaging,代码行数:10,代码来源:test_impl_qpid.py


示例20: test_weight_2

    def test_weight_2(self):
        for spec in weighted:
            part = PartitionDevice('weight_test')
            part._format = Mock(name="fmt", type=spec.fstype, mountpoint=spec.mountpoint,
                                mountable=spec.mountpoint is not None)
            with patch('blivet.devices.partition.arch') as _arch:
                for func in arch_funcs:
                    f = getattr(_arch, func)
                    f.return_value = func in spec.true_funcs

                self.assertEqual(part.weight, spec.weight)
开发者ID:rhinstaller,项目名称:blivet,代码行数:11,代码来源:partition_test.py



注:本文中的six.moves.mock.patch函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python queue.Queue类代码示例发布时间:2022-05-27
下一篇:
Python mock.call函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap