• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python registry.subscribe函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.callbacks.registry.subscribe函数的典型用法代码示例。如果您正苦于以下问题:Python subscribe函数的具体用法?Python subscribe怎么用?Python subscribe使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了subscribe函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: register

def register(callback, agent_type):
    """Subscribe callback to init event for the specified agent.

    :param agent_type: an agent type as defined in neutron_lib.constants.
    :param callback: a callback that can process the agent init event.
    """
    registry.subscribe(callback, agent_type, events.AFTER_INIT)
开发者ID:cloudbase,项目名称:neutron,代码行数:7,代码来源:capabilities.py


示例2: register

def register():
    """Register the driver."""
    global DRIVER
    DRIVER = OVSDriver.create()
    # To set the bridge_name in a parent port's vif_details.
    registry.subscribe(vif_details_bridge_name_handler, agent_consts.OVS_BRIDGE_NAME, events.BEFORE_READ)
    LOG.debug("Open vSwitch trunk driver registered")
开发者ID:sebrandon1,项目名称:neutron,代码行数:7,代码来源:driver.py


示例3: test_security_group_precommit_create_event_fail

 def test_security_group_precommit_create_event_fail(self):
     registry.subscribe(fake_callback, resources.SECURITY_GROUP, events.PRECOMMIT_CREATE)
     with mock.patch.object(sqlalchemy.orm.session.SessionTransaction, "rollback") as mock_rollback:
         self.assertRaises(
             securitygroup.SecurityGroupConflict, self.mixin.create_security_group, self.ctx, FAKE_SECGROUP
         )
         self.assertTrue(mock_rollback.called)
开发者ID:FedericoRessi,项目名称:neutron,代码行数:7,代码来源:test_securitygroups_db.py


示例4: subscribe

def subscribe():
    registry.subscribe(_update_segment_host_mapping_for_agent,
                       resources.AGENT,
                       events.AFTER_CREATE)
    registry.subscribe(_update_segment_host_mapping_for_agent,
                       resources.AGENT,
                       events.AFTER_UPDATE)
开发者ID:andreitira,项目名称:neutron,代码行数:7,代码来源:db.py


示例5: __init__

    def __init__(self):
        # FIXME(jamielennox): A notifier is being created for each Controller
        # and each Notifier is handling it's own auth. That means that we are
        # authenticating the exact same thing len(controllers) times. This
        # should be an easy thing to optimize.
        # FIXME(kevinbenton): remove this comment and the one above once the
        # switch to pecan is complete since only one notifier is constructed
        # in the pecan notification hook.
        auth = ks_loading.load_auth_from_conf_options(cfg.CONF, 'nova')

        session = ks_loading.load_session_from_conf_options(
            cfg.CONF,
            'nova',
            auth=auth)

        extensions = [
            ext for ext in nova_client.discover_extensions(NOVA_API_VERSION)
            if ext.name == "server_external_events"]
        self.nclient = nova_client.Client(
            NOVA_API_VERSION,
            session=session,
            region_name=cfg.CONF.nova.region_name,
            endpoint_type=cfg.CONF.nova.endpoint_type,
            extensions=extensions)
        self.batch_notifier = batch_notifier.BatchNotifier(
            cfg.CONF.send_events_interval, self.send_events)

        # register callbacks for events pertaining resources affecting Nova
        callback_resources = (
            resources.FLOATING_IP,
            resources.PORT,
        )
        for resource in callback_resources:
            registry.subscribe(self._send_nova_notification,
                               resource, events.BEFORE_RESPONSE)
开发者ID:biruce-openstack,项目名称:neutron,代码行数:35,代码来源:nova.py


示例6: setUp

 def setUp(self):
     super(TestStatusBarriers, self).setUp()
     self.ctx = n_ctx.get_admin_context()
     self.provisioned = mock.Mock()
     self.port = self._make_port()
     registry.subscribe(self.provisioned, resources.PORT,
                        pb.PROVISIONING_COMPLETE)
开发者ID:21atlas,项目名称:neutron,代码行数:7,代码来源:test_provisioning_blocks.py


示例7: __init__

 def __init__(self, l3_agent):
     self.metadata_port = l3_agent.conf.metadata_port
     self.metadata_access_mark = l3_agent.conf.metadata_access_mark
     registry.subscribe(
         after_router_added, resources.ROUTER, events.AFTER_CREATE)
     registry.subscribe(
         before_router_removed, resources.ROUTER, events.BEFORE_DELETE)
开发者ID:Intellifora,项目名称:neutron,代码行数:7,代码来源:driver.py


示例8: setup_sg_rpc_callbacks

    def setup_sg_rpc_callbacks(self):
        # following way to register call back functions start in kilo
        self._create_sg_f = self.bsn_create_sg_callback
        self._delete_sg_f = self.bsn_delete_sg_callback
        self._update_sg_f = self.bsn_update_sg_callback
        self._create_sg_rule_f = self.bsn_create_sg_rule_callback
        self._delete_sg_rule_f = self.bsn_delete_sg_rule_callback
        registry.subscribe(self._create_sg_f,
                           resources.SECURITY_GROUP, events.AFTER_CREATE)
        registry.subscribe(self._delete_sg_f,
                           resources.SECURITY_GROUP, events.AFTER_DELETE)
        registry.subscribe(self._update_sg_f,
                           resources.SECURITY_GROUP, events.AFTER_UPDATE)
        registry.subscribe(self._create_sg_rule_f,
                           resources.SECURITY_GROUP_RULE, events.AFTER_CREATE)
        registry.subscribe(self._delete_sg_rule_f,
                           resources.SECURITY_GROUP_RULE, events.AFTER_DELETE)

        # the above does not cover the cases where security groups are
        # initially created or when they are deleted since those actions
        # aren't needed by the L2 agent. In order to receive those, we
        # subscribe to the notifications topic that receives all of the
        # API create/update/delete events.
        # Notifications are published at the 'info' level so they will result
        # in a call to the 'info' function below. From there we can check
        # the event type and determine what to do from there.
        target = oslo_messaging.Target(topic='#',
                                       server=cfg.CONF.host)
        keystone_target = oslo_messaging.Target(
             topic='#', exchange='keystone', server=cfg.CONF.host)
        self.listener = oslo_messaging.get_notification_listener(
            n_rpc.TRANSPORT, [target, keystone_target], [self],
            executor='eventlet', allow_requeue=False)
        self.listener.start()
开发者ID:wolverineav,项目名称:networking-bigswitch,代码行数:34,代码来源:driver.py


示例9: subscribe

    def subscribe(self):
        # REVISIT(Ryu): Keep an extra set of strong references to the callback
        # methods so that they are not prematurely garbage collected.
        # This hack is required in the Liberty branch (not stable/kilo) because
        # currently the Liberty mechanism driver is expected to work with Kilo
        # ML2 plugin for MidoNet, and in Kilo ML2, there is a bug in which
        # these methods were referenced as weakrefs where a garbage collection
        # could remove these callbacks if they were part of an object.

        self._create_sg_f = self.create_security_group
        self._update_sg_f = self.update_security_group
        self._delete_sg_f = self.delete_security_group
        self._create_sgr_f = self.create_security_group_rule
        self._delete_sgr_f = self.delete_security_group_rule

        registry.subscribe(
            self._create_sg_f, resources.SECURITY_GROUP, events.AFTER_CREATE)
        registry.subscribe(
            self._update_sg_f, resources.SECURITY_GROUP, events.AFTER_UPDATE)
        registry.subscribe(
            self._delete_sg_f, resources.SECURITY_GROUP, events.AFTER_DELETE)
        registry.subscribe(
            self._create_sgr_f, resources.SECURITY_GROUP_RULE,
            events.AFTER_CREATE)
        registry.subscribe(
            self._delete_sgr_f, resources.SECURITY_GROUP_RULE,
            events.AFTER_DELETE)
开发者ID:Prabhat2015,项目名称:networking-midonet,代码行数:27,代码来源:sg_callback.py


示例10: __init__

 def __init__(self):
     db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
         attributes.PORTS, [_extend_port_trunk_details])
     self._segmentation_types = {}
     registry.subscribe(rules.enforce_port_deletion_rules,
                        resources.PORT, events.BEFORE_DELETE)
     registry.notify(constants.TRUNK_PLUGIN, events.AFTER_INIT, self)
     LOG.debug('Trunk plugin loaded')
开发者ID:electrocucaracha,项目名称:neutron,代码行数:8,代码来源:plugin.py


示例11: test_treat_devices_removed_notify

 def test_treat_devices_removed_notify(self):
     handler = mock.Mock()
     registry.subscribe(handler, resources.PORT_DEVICE, events.AFTER_DELETE)
     devices = [DEVICE_1]
     self.agent.treat_devices_removed(devices)
     handler.assert_called_once_with(mock.ANY, mock.ANY, self.agent,
                                     context=mock.ANY, device=DEVICE_1,
                                     port_id=mock.ANY)
开发者ID:annp,项目名称:neutron,代码行数:8,代码来源:test__common_agent.py


示例12: __init__

    def __init__(self, trunk_manager):
        self._context = n_context.get_admin_context_without_session()
        self.trunk_manager = trunk_manager
        self.trunk_rpc = agent.TrunkStub()

        registry.subscribe(self.process_trunk_port_events,
                           ovs_agent_constants.OVSDB_RESOURCE,
                           events.AFTER_READ)
开发者ID:coreycb,项目名称:neutron,代码行数:8,代码来源:ovsdb_handler.py


示例13: __init__

 def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
     self._plugin = plugin
     target = oslo_messaging.Target(topic=topic, version='1.0')
     self.client = n_rpc.get_client(target)
     # register callbacks for router interface changes
     registry.subscribe(self._after_router_interface_created,
                        resources.ROUTER_INTERFACE, events.AFTER_CREATE)
     registry.subscribe(self._after_router_interface_deleted,
                        resources.ROUTER_INTERFACE, events.AFTER_DELETE)
开发者ID:ISCAS-VDI,项目名称:neutron-base,代码行数:9,代码来源:dhcp_rpc_agent_api.py


示例14: __init__

    def __init__(self, service_plugin):
        super(BaGPipeBGPVPNDriver, self).__init__(service_plugin)

        self.agent_rpc = rpc_client.BGPVPNAgentNotifyApi()

        registry.subscribe(self.registry_port_updated, resources.PORT,
                           events.AFTER_UPDATE)

        registry.subscribe(self.registry_port_deleted, resources.PORT,
                           events.AFTER_DELETE)
开发者ID:nikesh-mahalka,项目名称:networking-bgpvpn,代码行数:10,代码来源:bagpipe.py


示例15: test_security_group_precommit_delete_event_fail

 def test_security_group_precommit_delete_event_fail(self):
     registry.subscribe(fake_callback, resources.SECURITY_GROUP,
                        events.PRECOMMIT_DELETE)
     sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
     with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
                           'rollback') as mock_rollback:
         self.assertRaises(securitygroup.SecurityGroupInUse,
                           self.mixin.delete_security_group,
                           self.ctx, sg_dict['id'])
         self.assertTrue(mock_rollback.called)
开发者ID:HoratiusTang,项目名称:neutron,代码行数:10,代码来源:test_securitygroups_db.py


示例16: __init__

 def __init__(self):
     super(OVNPlugin, self).__init__()
     LOG.info(_("Starting OVNPlugin"))
     self.vif_type = portbindings.VIF_TYPE_OVS
     # When set to True, Nova plugs the VIF directly into the ovs bridge
     # instead of using the hybrid mode.
     self.vif_details = {portbindings.CAP_PORT_FILTER: True}
     registry.subscribe(self.post_fork_initialize, resources.PROCESS, events.AFTER_CREATE)
     self._setup_dhcp()
     self._start_rpc_notifiers()
开发者ID:vdham,项目名称:networking-ovn,代码行数:10,代码来源:plugin.py


示例17: register_legacy_notification_callbacks

    def register_legacy_notification_callbacks(self, legacy_interface):
        """Emulates the server-side notifications from ml2 AgentNotifierApi.

        legacy_interface is an object with 'delete'/'update' methods for
        core resources.
        """
        self._legacy_interface = legacy_interface
        for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
            for r in (resources.PORT, resources.NETWORK):
                registry.subscribe(self._legacy_notifier, r, e)
开发者ID:eayunstack,项目名称:neutron,代码行数:10,代码来源:rpc.py


示例18: test_record_resource_delete_ignores_dups

 def test_record_resource_delete_ignores_dups(self):
     received_kw = []
     receiver = lambda *a, **k: received_kw.append(k)
     registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
     self.rcache.record_resource_delete(self.ctx, 'goose', 3)
     self.assertEqual(1, len(received_kw))
     self.rcache.record_resource_delete(self.ctx, 'goose', 4)
     self.assertEqual(2, len(received_kw))
     self.rcache.record_resource_delete(self.ctx, 'goose', 3)
     self.assertEqual(2, len(received_kw))
开发者ID:eayunstack,项目名称:neutron,代码行数:10,代码来源:test_resource_cache.py


示例19: test_adding_component_for_new_resource_type

 def test_adding_component_for_new_resource_type(self):
     provisioned = mock.Mock()
     registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE)
     net = self._make_net()
     # expect failed because the model was not registered for the type
     with testtools.ExpectedException(RuntimeError):
         pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
     pb.add_model_for_resource('NETWORK', models_v2.Network)
     pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
     pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent')
     self.assertTrue(provisioned.called)
开发者ID:21atlas,项目名称:neutron,代码行数:11,代码来源:test_provisioning_blocks.py


示例20: register

 def register(self, resource, event, trigger, **kwargs):
     super(OVNTrunkDriver, self).register(
         resource, event, trigger, **kwargs)
     self._handler = OVNTrunkHandler(self.plugin_driver)
     for event in (events.AFTER_CREATE, events.AFTER_DELETE):
         registry.subscribe(self._handler.trunk_event,
                            trunk_consts.TRUNK,
                            event)
         registry.subscribe(self._handler.subport_event,
                            trunk_consts.SUBPORTS,
                            event)
开发者ID:flavio-fernandes,项目名称:networking-ovn,代码行数:11,代码来源:trunk_driver.py



注:本文中的neutron.callbacks.registry.subscribe函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python checks.arp_header_match_supported函数代码示例发布时间:2022-05-27
下一篇:
Python registry.notify函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap