• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sd_backend.get_sd_backend函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.service_discovery.sd_backend.get_sd_backend函数的典型用法代码示例。如果您正苦于以下问题:Python get_sd_backend函数的具体用法?Python get_sd_backend怎么用?Python get_sd_backend使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_sd_backend函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _get_events

 def _get_events(self):
     """Get the list of events."""
     events, changed_container_ids = self.docker_util.get_events()
     if not self._disable_net_metrics:
         self._invalidate_network_mapping_cache(events)
     if changed_container_ids and self._service_discovery:
         get_sd_backend(self.agentConfig).update_checks(changed_container_ids)
     if changed_container_ids:
         self.metadata_collector.invalidate_cache(events)
         if Platform.is_nomad():
             self.nomadutil.invalidate_cache(events)
         elif Platform.is_ecs_instance():
             self.ecsutil.invalidate_cache(events)
     return events
开发者ID:dblackdblack,项目名称:integrations-core,代码行数:14,代码来源:check.py


示例2: test_get_host_address

    def test_get_host_address(self, mock_check_yaml, mock_get, *args):
        kubernetes_config = {'instances': [{'kubelet_port': 1337}]}
        pod_list = {
            'items': [{
                'status': {
                    'podIP': '127.0.0.1',
                    'containerStatuses': [
                        {'containerID': 'docker://389dc8a4361f3d6c866e9e9a7b6972b26a31c589c4e2f097375d55656a070bc9'}
                    ]
                }
            }]
        }

        # (inspect, tpl_var, expected_result)
        ip_address_inspects = [
            ({'NetworkSettings': {}}, 'host', None),
            ({'NetworkSettings': {'IPAddress': ''}}, 'host', None),

            ({'NetworkSettings': {'IPAddress': '127.0.0.1'}}, 'host', '127.0.0.1'),
            ({'NetworkSettings': {'IPAddress': '127.0.0.1', 'Networks': {}}}, 'host', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '127.0.0.1',
                'Networks': {'bridge': {'IPAddress': '127.0.0.1'}}}},
             'host', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '',
                'Networks': {'bridge': {'IPAddress': '127.0.0.1'}}}},
             'host_bridge', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '127.0.0.1',
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'},
                    'foo': {'IPAddress': '192.168.0.2'}}}},
             'host', '172.17.0.2'),

            ({'NetworkSettings': {'Networks': {}}}, 'host', None),
            ({'NetworkSettings': {'Networks': {}}}, 'host_bridge', None),
            ({'NetworkSettings': {'Networks': {'bridge': {}}}}, 'host', None),
            ({'NetworkSettings': {'Networks': {'bridge': {}}}}, 'host_bridge', None),
            ({'NetworkSettings': {
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'}
                }}},
             'host_bridge', '172.17.0.2'),
            ({'NetworkSettings': {
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'},
                    'foo': {'IPAddress': '192.168.0.2'}
                }}},
             'host_foo', '192.168.0.2')
        ]

        mock_check_yaml.return_value = kubernetes_config
        mock_get.return_value = Response(pod_list)

        for c_ins, tpl_var, expected_ip in ip_address_inspects:
            state = _SDDockerBackendConfigFetchState(lambda _: c_ins)
            sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
            self.assertEquals(sd_backend._get_host_address(state, 'container id', tpl_var), expected_ip)
            clear_singletons(self.auto_conf_agentConfig)
开发者ID:dblackdblack,项目名称:dd-agent,代码行数:60,代码来源:test_service_discovery.py


示例3: test_render_template

    def test_render_template(self):
        """Test _render_template"""
        valid_configs = [
            (({}, {'host': '%%host%%'}, {'host': 'foo'}),
             ({}, {'host': 'foo'})),
            (({}, {'host': '%%host%%', 'port': '%%port%%'}, {'host': 'foo', 'port': '1337'}),
             ({}, {'host': 'foo', 'port': '1337'})),
            (({'foo': '%%bar%%'}, {}, {'bar': 'w00t'}),
             ({'foo': 'w00t'}, {})),
            (({'foo': '%%bar%%'}, {'host': '%%host%%'}, {'bar': 'w00t', 'host': 'localhost'}),
             ({'foo': 'w00t'}, {'host': 'localhost'}))
        ]

        invalid_configs = [
            ({}, {'host': '%%host%%'}, {}),  # no value to use
            ({}, {'host': '%%host%%'}, {'port': 42}),  # the variable name doesn't match
            ({'foo': '%%bar%%'}, {'host': '%%host%%'}, {'host': 'foo'})  # not enough value/no matching var name
        ]

        with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
            with mock.patch.object(EtcdStore, 'get_client', return_value=None):
                with mock.patch.object(ConsulStore, 'get_client', return_value=None):
                    for agentConfig in self.agentConfigs:
                        sd_backend = get_sd_backend(agentConfig=agentConfig)
                        for tpl, res in valid_configs:
                            init, instance, variables = tpl
                            config = sd_backend._render_template(init, instance, variables)
                            self.assertEquals(config, res)
                        for init, instance, variables in invalid_configs:
                            config = sd_backend._render_template(init, instance, variables)
                            self.assertEquals(config, None)
                            clear_singletons(agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:32,代码来源:test_service_discovery.py


示例4: test_get_port

 def test_get_port(self):
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         for c_ins, _, var_tpl, expected_ports, _ in self.container_inspects:
             sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
             if isinstance(expected_ports, str):
                 self.assertEquals(sd_backend._get_port(c_ins, var_tpl), expected_ports)
             else:
                 self.assertRaises(expected_ports, sd_backend._get_port, c_ins, var_tpl)
             clear_singletons(self.auto_conf_agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:9,代码来源:test_service_discovery.py


示例5: test_get_config_id

 def test_get_config_id(self, mock_get_auto_confd_path):
     """Test get_config_id"""
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         for c_ins, _, _, _, expected_ident, _ in self.container_inspects:
             sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
             self.assertEqual(
                 sd_backend.get_config_id(DockerUtil().image_name_extractor(c_ins), c_ins.get('Config', {}).get('Labels', {})),
                 expected_ident)
             clear_singletons(self.auto_conf_agentConfig)
开发者ID:serverdensity,项目名称:sd-agent,代码行数:9,代码来源:test_service_discovery.py


示例6: test_get_config_id

 def test_get_config_id(self):
     """Test get_config_id"""
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         for c_ins, _, _, _, expected_ident in self.container_inspects:
             sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
             self.assertEqual(
                 sd_backend.get_config_id(c_ins.get('Image'), c_ins.get('Labels', {})),
                 expected_ident)
             clear_singletons(self.auto_conf_agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:9,代码来源:test_service_discovery.py


示例7: test_get_port

 def test_get_port(self, *args):
     for c_ins, _, var_tpl, expected_ports, _ in self.container_inspects:
         state = _SDDockerBackendConfigFetchState(lambda _: c_ins)
         sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
         if isinstance(expected_ports, str):
             self.assertEquals(sd_backend._get_port(state, 'container id', var_tpl), expected_ports)
         else:
             self.assertRaises(expected_ports, sd_backend._get_port, state, 'c_id', var_tpl)
         clear_singletons(self.auto_conf_agentConfig)
开发者ID:dblackdblack,项目名称:dd-agent,代码行数:9,代码来源:test_service_discovery.py


示例8: _service_disco_configs

def _service_disco_configs(agentConfig):
    """ Retrieve all the service disco configs and return their conf dicts
    """
    if agentConfig.get('service_discovery') and agentConfig.get('service_discovery_backend') in SD_BACKENDS:
        sd_backend = get_sd_backend(agentConfig=agentConfig)
        service_disco_configs = sd_backend.get_configs()
    else:
        service_disco_configs = {}

    return service_disco_configs
开发者ID:awasilyev,项目名称:dd-agent,代码行数:10,代码来源:config.py


示例9: test_get_check_configs

 def test_get_check_configs(self, *args):
     """Test get_check_config with mocked container inspect and config template"""
     c_id = self.docker_container_inspect.get('Id')
     for image in self.mock_templates.keys():
         sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
         state = _SDDockerBackendConfigFetchState(_get_container_inspect)
         self.assertEquals(
             sd_backend._get_check_configs(state, c_id, image)[0],
             self.mock_templates[image][1])
         clear_singletons(self.auto_conf_agentConfig)
开发者ID:dblackdblack,项目名称:dd-agent,代码行数:10,代码来源:test_service_discovery.py


示例10: test_get_check_configs

 def test_get_check_configs(self, mock_inspect_container, mock_get_conf_tpls):
     """Test get_check_config with mocked container inspect and config template"""
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         with mock.patch.object(SDDockerBackend, '_get_host_address', return_value='127.0.0.1'):
             with mock.patch.object(SDDockerBackend, '_get_port', return_value='1337'):
                 c_id = self.docker_container_inspect.get('Id')
                 for image in self.mock_templates.keys():
                     sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
                     self.assertEquals(
                         sd_backend._get_check_configs(c_id, image)[0],
                         self.mock_templates[image][1])
                     clear_singletons(self.auto_conf_agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:12,代码来源:test_service_discovery.py


示例11: test_get_config_templates

 def test_get_config_templates(self, *args):
     """Test _get_config_templates with mocked get_check_tpls"""
     for agentConfig in self.agentConfigs:
         sd_backend = get_sd_backend(agentConfig=agentConfig)
         # normal cases
         for image in self.mock_templates.keys():
             template = sd_backend._get_config_templates(image)
             expected_template = self.mock_templates.get(image)[0]
             self.assertEquals(template, expected_template)
         # error cases
         for image in self.bad_mock_templates.keys():
             self.assertEquals(sd_backend._get_config_templates(image), None)
         clear_singletons(agentConfig)
开发者ID:dblackdblack,项目名称:dd-agent,代码行数:13,代码来源:test_service_discovery.py


示例12: _service_disco_configs

def _service_disco_configs(agentConfig):
    """ Retrieve all the service disco configs and return their conf dicts
    """
    if agentConfig.get('service_discovery') and agentConfig.get('service_discovery_backend') in SD_BACKENDS:
        try:
            log.info("Fetching service discovery check configurations.")
            sd_backend = get_sd_backend(agentConfig=agentConfig)
            service_disco_configs = sd_backend.get_configs()
        except Exception:
            log.exception("Loading service discovery configurations failed.")
    else:
        service_disco_configs = {}

    return service_disco_configs
开发者ID:ewdurbin,项目名称:dd-agent,代码行数:14,代码来源:config.py


示例13: test_get_config_templates

 def test_get_config_templates(self, mock_get_check_tpls):
     """Test _get_config_templates with mocked get_check_tpls"""
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         with mock.patch.object(EtcdStore, 'get_client', return_value=None):
             with mock.patch.object(ConsulStore, 'get_client', return_value=None):
                 for agentConfig in self.agentConfigs:
                     sd_backend = get_sd_backend(agentConfig=agentConfig)
                     # normal cases
                     for image in self.mock_templates.keys():
                         template = sd_backend._get_config_templates(image)
                         expected_template = self.mock_templates.get(image)[0]
                         self.assertEquals(template, expected_template)
                     # error cases
                     for image in self.bad_mock_templates.keys():
                         self.assertEquals(sd_backend._get_config_templates(image), None)
                     clear_singletons(agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:16,代码来源:test_service_discovery.py


示例14: test_fill_tpl

 def test_fill_tpl(self):
     """Test _fill_tpl with mock _get_ports"""
     valid_configs = [
         # ((inspect, instance_tpl, variables, tags), (expected_instance_tpl, expected_var_values))
         (
             ({}, {'host': 'localhost'}, [], None),
             ({'host': 'localhost'}, {})
         ),
         (
             ({'NetworkSettings': {'IPAddress': '127.0.0.1'}},
              {'host': '%%host%%', 'port': 1337}, ['host'], ['foo', 'bar:baz']),
             ({'host': '%%host%%', 'port': 1337, 'tags': ['foo', 'bar:baz']}, {'host': '127.0.0.1'})
         ),
         (
             ({'NetworkSettings': {'IPAddress': '127.0.0.1', 'Ports': {'42/tcp': None, '22/tcp': None}}},
              {'host': '%%host%%', 'port': '%%port_1%%', 'tags': ['env:test']},
              ['host', 'port_1'], ['foo', 'bar:baz']),
             ({'host': '%%host%%', 'port': '%%port_1%%', 'tags': ['env:test', 'foo', 'bar:baz']},
              {'host': '127.0.0.1', 'port_1': '42'})
         )
     ]
     with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
         for ac in self.agentConfigs:
             sd_backend = get_sd_backend(agentConfig=ac)
             try:
                 for co in valid_configs:
                     inspect, tpl, variables, tags = co[0]
                     instance_tpl, var_values = sd_backend._fill_tpl(inspect, tpl, variables, tags)
                     for key in instance_tpl.keys():
                         if isinstance(instance_tpl[key], list):
                             self.assertEquals(len(instance_tpl[key]), len(co[1][0].get(key)))
                             for elem in instance_tpl[key]:
                                 self.assertTrue(elem in co[1][0].get(key))
                         else:
                             self.assertEquals(instance_tpl[key], co[1][0].get(key))
                     self.assertEquals(var_values, co[1][1])
                 clear_singletons(ac)
             except Exception:
                 clear_singletons(ac)
                 raise
开发者ID:7040210,项目名称:dd-agent,代码行数:40,代码来源:test_service_discovery.py


示例15: __init__

    def __init__(self, name, init_config, agentConfig, instances=None):
        if instances is not None and len(instances) > 1:
            raise Exception('Kubernetes check only supports one configured instance.')

        AgentCheck.__init__(self, name, init_config, agentConfig, instances)

        inst = instances[0] if instances is not None else None
        self.kubeutil = KubeUtil(instance=inst)
        if not self.kubeutil.kubelet_api_url:
            raise Exception('Unable to reach kubelet. Try setting the host parameter.')

        if agentConfig.get('service_discovery') and \
           agentConfig.get('service_discovery_backend') == 'docker':
            self._sd_backend = get_sd_backend(agentConfig)
        else:
            self._sd_backend = None

        self.k8s_namespace_regexp = None
        if inst:
            regexp = inst.get('namespace_name_regexp', None)
            if regexp:
                try:
                    self.k8s_namespace_regexp = re.compile(regexp)
                except re.error as e:
                    self.log.warning('Invalid regexp for "namespace_name_regexp" in configuration (ignoring regexp): %s' % str(e))

            self._collect_events = _is_affirmative(inst.get('collect_events', DEFAULT_COLLECT_EVENTS))
            if self._collect_events:
                self.event_retriever = self.kubeutil.get_event_retriever()
            elif self.kubeutil.collect_service_tag:
                # Only fetch service and pod events for service mapping
                event_delay = inst.get('service_tag_update_freq', DEFAULT_SERVICE_EVENT_FREQ)
                self.event_retriever = self.kubeutil.get_event_retriever(kinds=['Service', 'Pod'],
                                                                         delay=event_delay)
            else:
                self.event_retriever = None
        else:
            self._collect_events = None
            self.event_retriever = None
开发者ID:dblackdblack,项目名称:integrations-core,代码行数:39,代码来源:check.py


示例16: test_get_host

    def test_get_host(self, mock_check_yaml, mock_get):
        kubernetes_config = {'instances': [{'kubelet_port': 1337}]}
        pod_list = {
            'items': [{
                'status': {
                    'podIP': '127.0.0.1',
                    'containerStatuses': [
                        {'containerID': 'docker://389dc8a4361f3d6c866e9e9a7b6972b26a31c589c4e2f097375d55656a070bc9'}
                    ]
                }
            }]
        }

        mock_check_yaml.return_value = kubernetes_config
        mock_get.return_value = Response(pod_list)

        for c_ins, expected_ip, _ in self.container_inspects:
            with mock.patch.object(AbstractConfigStore, '__init__', return_value=None):
                with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
                    with mock.patch('utils.kubeutil.get_conf_path', return_value=None):
                        sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
                        self.assertEqual(sd_backend._get_host(c_ins), expected_ip)
                        clear_singletons(self.auto_conf_agentConfig)
开发者ID:7040210,项目名称:dd-agent,代码行数:23,代码来源:test_service_discovery.py


示例17: test_get_image_ident

 def test_get_image_ident(self, *args):
     sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
     # normal cases
     for image, ident in self.image_formats.iteritems():
         self.assertEquals(ident, sd_backend.config_store._get_image_ident(image))
开发者ID:serverdensity,项目名称:sd-agent,代码行数:5,代码来源:test_service_discovery.py


示例18: test_fill_tpl


#.........这里部分代码省略.........
                    'Networks': {'bridge': {'IPAddress': '172.17.0.2'}}}},
                 {'host': '%%host_bridge%%', 'port': 1337}, ['host_bridge'], ['foo', 'bar:baz']),
                ({'host': '%%host_bridge%%', 'port': 1337, 'tags': ['foo', 'bar:baz']},
                 {'host_bridge': '172.17.0.2'})
            ),
            # specify index but there is a default IPAddress (there's a specifier, even if it's wrong, walking networks should be preferred)
            (
                ({'NetworkSettings': {
                    'IPAddress': '127.0.0.1',
                    'Networks': {'bridge': {'IPAddress': '172.17.0.2'}}}},
                 {'host': '%%host_0%%', 'port': 1337}, ['host_0'], ['foo', 'bar:baz']),
                ({'host': '%%host_0%%', 'port': 1337, 'tags': ['foo', 'bar:baz']}, {'host_0': '172.17.0.2'}),
            ),
            # missing key for host, bridge network should be preferred
            (
                ({'NetworkSettings': {'Networks': {
                    'bridge': {'IPAddress': '127.0.0.1'},
                    'foo': {'IPAddress': '172.17.0.2'}}}},
                 {'host': '%%host_bar%%', 'port': 1337}, ['host_bar'], []),
                ({'host': '%%host_bar%%', 'port': 1337}, {'host_bar': '127.0.0.1'}),
            ),
            # missing index for port
            (
                ({'NetworkSettings': {'IPAddress': '127.0.0.1', 'Ports': {'42/tcp': None, '22/tcp': None}}},
                 {'host': '%%host%%', 'port': '%%port_2%%', 'tags': ['env:test']},
                 ['host', 'port_2'], ['foo', 'bar:baz']),
                ({'host': '%%host%%', 'port': '%%port_2%%', 'tags': ['env:test', 'foo', 'bar:baz']},
                 {'host': '127.0.0.1', 'port_2': '42'})
            )
        ]

        # should raise
        invalid_config = [
            # ((inspect, instance_tpl, variables, tags), expected_exception)

            # template variable but no IPAddress available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': '%%host%%', 'port': 1337}, ['host'], ['foo', 'bar:baz']),
                Exception,
            ),
            # index but no IPAddress available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': '%%host_0%%', 'port': 1337}, ['host_0'], ['foo', 'bar:baz']),
                Exception,
            ),
            # key but no IPAddress available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': '%%host_foo%%', 'port': 1337}, ['host_foo'], ['foo', 'bar:baz']),
                Exception,
            ),

            # template variable but no port available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': 'localhost', 'port': '%%port%%'}, ['port'], []),
                Exception,
            ),
            # index but no port available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': 'localhost', 'port_0': '%%port%%'}, ['port_0'], []),
                Exception,
            ),
            # key but no port available
            (
                ({'NetworkSettings': {'Networks': {}}},
                 {'host': 'localhost', 'port': '%%port_foo%%'}, ['port_foo'], []),
                Exception,
            )
        ]

        with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
            with mock.patch.object(EtcdStore, 'get_client', return_value=None):
                with mock.patch.object(ConsulStore, 'get_client', return_value=None):
                    for ac in self.agentConfigs:
                        sd_backend = get_sd_backend(agentConfig=ac)
                        try:
                            for co in valid_configs + edge_cases:
                                inspect, tpl, variables, tags = co[0]
                                instance_tpl, var_values = sd_backend._fill_tpl(inspect, tpl, variables, tags)
                                for key in instance_tpl.keys():
                                    if isinstance(instance_tpl[key], list):
                                        self.assertEquals(len(instance_tpl[key]), len(co[1][0].get(key)))
                                        for elem in instance_tpl[key]:
                                            self.assertTrue(elem in co[1][0].get(key))
                                    else:
                                        self.assertEquals(instance_tpl[key], co[1][0].get(key))
                                self.assertEquals(var_values, co[1][1])

                            for co in invalid_config:
                                inspect, tpl, variables, tags = co[0]
                                self.assertRaises(co[1], sd_backend._fill_tpl(inspect, tpl, variables, tags))

                            clear_singletons(ac)
                        except Exception:
                            clear_singletons(ac)
                            raise
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:101,代码来源:test_service_discovery.py


示例19: test_get_host_address

    def test_get_host_address(self, mock_check_yaml, mock_get):
        kubernetes_config = {'instances': [{'kubelet_port': 1337}]}
        pod_list = {
            'items': [{
                'status': {
                    'podIP': '127.0.0.1',
                    'containerStatuses': [
                        {'containerID': 'docker://389dc8a4361f3d6c866e9e9a7b6972b26a31c589c4e2f097375d55656a070bc9'}
                    ]
                }
            }]
        }

        # (inspect, tpl_var, expected_result)
        ip_address_inspects = [
            ({'NetworkSettings': {}}, 'host', None),
            ({'NetworkSettings': {'IPAddress': ''}}, 'host', None),

            ({'NetworkSettings': {'IPAddress': '127.0.0.1'}}, 'host', '127.0.0.1'),
            ({'NetworkSettings': {'IPAddress': '127.0.0.1', 'Networks': {}}}, 'host', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '127.0.0.1',
                'Networks': {'bridge': {'IPAddress': '127.0.0.1'}}}},
             'host', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '',
                'Networks': {'bridge': {'IPAddress': '127.0.0.1'}}}},
             'host_bridge', '127.0.0.1'),
            ({'NetworkSettings': {
                'IPAddress': '127.0.0.1',
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'},
                    'foo': {'IPAddress': '192.168.0.2'}}}},
             'host', '127.0.0.1'),

            ({'NetworkSettings': {'Networks': {}}}, 'host', None),
            ({'NetworkSettings': {'Networks': {}}}, 'host_bridge', None),
            ({'NetworkSettings': {'Networks': {'bridge': {}}}}, 'host', None),
            ({'NetworkSettings': {'Networks': {'bridge': {}}}}, 'host_bridge', None),
            ({'NetworkSettings': {
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'}
                }}},
             'host_bridge', '172.17.0.2'),
            ({'NetworkSettings': {
                'Networks': {
                    'bridge': {'IPAddress': '172.17.0.2'},
                    'foo': {'IPAddress': '192.168.0.2'}
                }}},
             'host_foo', '192.168.0.2')
        ]

        mock_check_yaml.return_value = kubernetes_config
        mock_get.return_value = Response(pod_list)

        for c_ins, tpl_var, expected_ip in ip_address_inspects:
            with mock.patch.object(AbstractConfigStore, '__init__', return_value=None):
                with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None):
                    with mock.patch('utils.kubeutil.get_conf_path', return_value=None):
                        sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig)
                        self.assertEquals(sd_backend._get_host_address(c_ins, tpl_var), expected_ip)
                        clear_singletons(self.auto_conf_agentConfig)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:62,代码来源:test_service_discovery.py


示例20: get_os

    osname = get_os()
    checks_paths = get_checks_paths(agentConfig, osname)
    # this can happen if check.d is not found
    if checks_paths is None:
        log.error('Check directory not found, exiting. The agent is likely misconfigured.')
        sys.exit(3)

    try:
        confd_path = get_confd_path(osname)
    except PathNotFound, e:
        log.error("No conf.d folder found at '%s' or in the directory where "
                  "the Agent is currently deployed.\n" % e.args[0])
        sys.exit(3)

    if agentConfig.get('service_discovery') and agentConfig.get('service_discovery_backend') in SD_BACKENDS:
        sd_backend = get_sd_backend(agentConfig=agentConfig)
        service_disco_configs = sd_backend.get_configs()
    else:
        service_disco_configs = {}

    # We don't support old style configs anymore
    # So we iterate over the files in the checks.d directory
    # If there is a matching configuration file in the conf.d directory
    # then we import the check
    for check in itertools.chain(*checks_paths):
        sd_init_config, sd_instances, skip_config_lookup = None, None, False
        check_name = os.path.basename(check).split('.')[0]
        check_config = None
        if check_name in initialized_checks or check_name in init_failed_checks:
            log.debug('Skipping check %s because it has already been loaded from another location', check)
            continue
开发者ID:abhilash07,项目名称:dd-agent,代码行数:31,代码来源:config.py



注:本文中的utils.service_discovery.sd_backend.get_sd_backend函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sessioninfomanager.checkSessionInfo函数代码示例发布时间:2022-05-26
下一篇:
Python config_stores.get_config_store函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap