• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python common.load_check函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.common.load_check函数的典型用法代码示例。如果您正苦于以下问题:Python load_check函数的具体用法?Python load_check怎么用?Python load_check使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了load_check函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testInit

    def testInit(self):
        self.config = {
                "init_config": {
                    'mibs_folder':'/etc/mibs'
                    }
                }
        # Initialize the check from checks.d
        self.check = load_check('snmp', self.config, self.agentConfig)

        mib_folders = self.check.cmd_generator.snmpEngine.msgAndPduDsp\
                .mibInstrumController.mibBuilder.getMibSources()
        custom_folder_represented = False
        for folder in mib_folders:
            if '/etc/mibs' == folder.fullPath():
                custom_folder_represented = True
                break
        self.assertTrue(custom_folder_represented)
        self.assertFalse(self.check.cmd_generator.ignoreNonIncreasingOid)

        self.config = {
            "init_config": {
                "ignore_nonincreasing_oid": True
            }
        }
        self.check = load_check('snmp', self.config, self.agentConfig)
        self.assertTrue(self.check.cmd_generator.ignoreNonIncreasingOid)
开发者ID:lgtml,项目名称:dd-agent,代码行数:26,代码来源:test_snmp.py


示例2: test_redis_auth

    def test_redis_auth(self):
        # Test connection with password
        if not self.is_travis():
            # correct password
            r = load_check('redisdb', {}, {})
            instance = {
                'host': 'localhost',
                'port': AUTH_PORT,
                'password': 'datadog-is-devops-best-friend'
            }
            r.check(instance)
            metrics = self._sort_metrics(r.get_metrics())
            assert len(metrics) > 0, "No metrics returned"

            # wrong passwords
            instances = [
                {
                    'host': 'localhost',
                    'port': AUTH_PORT,
                    'password': ''
                },
                {
                    'host': 'localhost',
                    'port': AUTH_PORT,
                    'password': 'badpassword'
                }
            ]
            for instance in instances:
                r = load_check('redisdb', {}, {})
                r.check(instance)
                metrics = self._sort_metrics(r.get_metrics())
                assert len(metrics) == 0, "Should have failed with bad password; got %s instead" % metrics
开发者ID:Qobuz,项目名称:dd-agent,代码行数:32,代码来源:test_redis.py


示例3: testElasticChecksD

    def testElasticChecksD(self):
        raise SkipTest("See https://github.com/DataDog/dd-agent/issues/825")
        agent_config = {'elasticsearch': 'http://localhost:%s' %
                        PORT, 'version': '0.1', 'api_key': 'toto'}

        # Initialize the check from checks_d
        c = load_check('elastic', {'init_config': {}, 'instances': {}}, agent_config)
        conf = c.parse_agent_config(agent_config)
        self.check = load_check('elastic', conf, agent_config)

        self.check.check(conf['instances'][0])
        r = self.check.get_metrics()

        self.assertIsInstance(r, list)
        self.assertTrue(len(r) > 0)
        self.assertEqual(len([t for t in r if t[0] == "elasticsearch.get.total"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "elasticsearch.search.fetch.total"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "jvm.gc.collection_time"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "jvm.mem.heap_committed"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "jvm.mem.heap_used"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "jvm.threads.count"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "jvm.threads.peak_count"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "elasticsearch.transport.rx_count"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "elasticsearch.transport.tx_size"]), 1, r)
        self.assertEqual(
            len([t for t in r if t[0] == "elasticsearch.transport.server_open"]), 1, r)
        self.assertEqual(
            len([t for t in r if t[0] == "elasticsearch.thread_pool.snapshot.queue"]), 1, r)
        self.assertEqual(len([t for t in r if t[0] == "elasticsearch.active_shards"]), 1, r)

        self.check.cluster_status[conf['instances'][0].get('url')] = "red"
        self.check.check(conf['instances'][0])
        events = self.check.get_events()
        self.assertEqual(len(events), 1, events)
开发者ID:jobrs,项目名称:monasca-agent,代码行数:34,代码来源:test_elastic.py


示例4: setUp

    def setUp(self):
        self.psutil_process_patcher = mock.patch('psutil.Process')
        self.psutil_process_iter_patcher = mock.patch('psutil.process_iter')

        self.mock_process = self.psutil_process_patcher.start()
        self.mock_process_iter = self.psutil_process_iter_patcher.start()

        process_attrs = {
            'name.return_value': 'process_name',
            'pid': 1234,
            'username.return_value': 'user',
            'cmdline.return_value': '/usr/bin/process_name',
            'memory_info_ex.return_value': mock.Mock(rss=1048576),
            'num_threads.return_value': 1,
            'num_fds.return_value': 1,
            'cpu_percent.return_value': 1,
            'io_counters.return_value': mock.Mock(**{'read_count': 1,
                                                     'write_count': 1,
                                                     'read_bytes': 1024,
                                                     'write_bytes': 1024})
        }
        process = mock.Mock(**process_attrs)
        self.mock_process_iter.return_value = [process]
        self.mock_process.return_value = process

        config = {'init_config': {},
                  'instances': [{'name': 'test',
                                 'search_string': ['process_name'],
                                 'detailed': True}]}
        self.check = load_check('process', config)
开发者ID:Chillisystems,项目名称:monasca-agent,代码行数:30,代码来源:test_process.py


示例5: test_service_checks

    def test_service_checks(self):
        config = {
            'instances': [
                {'server': 'http://localhost:5984'},
                {'server': 'http://localhost:5985'}]
        }
        agentConfig = {
            'version': '0.1',
            'api_key': 'toto'
        }

        self.check = load_check('couch', config, agentConfig)
        self.check.check(config['instances'][0])
        self.assertRaises(Exception, self.check.check, config['instances'][1])

        service_checks = self.check.get_service_checks()
        self.assertEqual(len(service_checks), 2)

        ok_svc_check = service_checks[0]
        self.assertEqual(ok_svc_check['check'], self.check.SERVICE_CHECK_NAME)
        self.assertEqual(ok_svc_check['status'], AgentCheck.OK)

        cr_svc_check = service_checks[1]
        self.assertEqual(cr_svc_check['check'], self.check.SERVICE_CHECK_NAME)
        self.assertEqual(cr_svc_check['status'], AgentCheck.CRITICAL)
开发者ID:Onewaysidewalks,项目名称:dd-agent,代码行数:25,代码来源:test_couch.py


示例6: test_redis_default

    def test_redis_default(self):
        # Base test, uses the noauth instance
        if self.is_travis():
            port = DEFAULT_PORT
        else:
            port = NOAUTH_PORT

        instance = {"host": "localhost", "port": port}

        db = redis.Redis(port=port, db=14)  # Datadog's test db
        db.flushdb()
        db.set("key1", "value")
        db.set("key2", "value")
        db.setex("expirekey", "expirevalue", 1000)

        r = load_check("redisdb", {}, {})
        r.check(instance)
        metrics = self._sort_metrics(r.get_metrics())
        assert metrics, "No metrics returned"

        # Assert we have values, timestamps and tags for each metric.
        for m in metrics:
            assert isinstance(m[1], int)  # timestamp
            assert isinstance(m[2], (int, float, long))  # value
            tags = m[3]["tags"]
            expected_tags = ["redis_host:localhost", "redis_port:%s" % port]
            for e in expected_tags:
                assert e in tags

        def assert_key_present(expected, present, tolerance):
            "Assert we have the rest of the keys (with some tolerance for missing keys)"
            e = set(expected)
            p = set(present)
            assert len(e - p) < tolerance * len(e), pprint.pformat((p, e - p))

        # gauges collected?
        remaining_keys = [m[0] for m in metrics]
        expected = r.GAUGE_KEYS.values()
        assert_key_present(expected, remaining_keys, MISSING_KEY_TOLERANCE)

        # Assert that the keys metrics are tagged by db. just check db0, since
        # it's the only one we can guarantee is there.
        db_metrics = self._sort_metrics(
            [m for m in metrics if m[0] in ["redis.keys", "redis.expires"] and "redis_db:db14" in m[3]["tags"]]
        )
        self.assertEquals(2, len(db_metrics))

        self.assertEquals("redis.expires", db_metrics[0][0])
        self.assertEquals(1, db_metrics[0][2])

        self.assertEquals("redis.keys", db_metrics[1][0])
        self.assertEquals(3, db_metrics[1][2])

        # Run one more check and ensure we get total command count
        # and other rates
        time.sleep(5)
        r.check(instance)
        metrics = self._sort_metrics(r.get_metrics())
        keys = [m[0] for m in metrics]
        assert "redis.net.commands" in keys
开发者ID:kolencherry,项目名称:dd-agent,代码行数:60,代码来源:test_redis.py


示例7: test_config_parser

    def test_config_parser(self):
        check = load_check(self.CHECK_NAME, {}, {})
        instance = {
            "username": "user",
            "password": "pass",
            "is_external": "yes",
            "url": "http://foo.bar",
            "tags": ["a", "b:c"],
        }

        c = check.get_instance_config(instance)
        self.assertEquals(c.username, "user")
        self.assertEquals(c.password, "pass")
        self.assertEquals(c.is_external, True)
        self.assertEquals(c.url, "http://foo.bar")
        self.assertEquals(c.tags, ["url:http://foo.bar", "a", "b:c"])
        self.assertEquals(c.timeout, check.DEFAULT_TIMEOUT)
        self.assertEquals(c.service_check_tags, ["host:foo.bar", "port:None"])

        instance = {
            "url": "http://192.168.42.42:12999",
            "timeout": 15
        }

        c = check.get_instance_config(instance)
        self.assertEquals(c.username, None)
        self.assertEquals(c.password, None)
        self.assertEquals(c.is_external, False)
        self.assertEquals(c.url, "http://192.168.42.42:12999")
        self.assertEquals(c.tags, ["url:http://192.168.42.42:12999"])
        self.assertEquals(c.timeout, 15)
        self.assertEquals(c.service_check_tags,
                          ["host:192.168.42.42", "port:12999"])
开发者ID:Joeasaurus,项目名称:dd-agent,代码行数:33,代码来源:test_elastic.py


示例8: test_build_event

    def test_build_event(self):
        agent_config = {
            'version': '0.1',
            'api_key': 'toto'
        }
        check = load_check('teamcity', CONFIG, agent_config)

        with patch('requests.get', get_mock_first_build):
            check.check(check.instances[0])

        metrics = check.get_metrics()
        self.assertEquals(len(metrics), 0)

        events = check.get_events()
        # Nothing should have happened because we only create events
        # for newer builds
        self.assertEquals(len(events), 0)

        with patch('requests.get', get_mock_one_more_build):
            check.check(check.instances[0])

        events = check.get_events()
        self.assertEquals(len(events), 1)
        self.assertEquals(events[0]['msg_title'], "Build for One test build successful")
        self.assertEquals(events[0]['msg_text'], "Build Number: 2\nDeployed To: buildhost42.dtdg.co\n\nMore Info: http://localhost:8111/viewLog.html?buildId=2&buildTypeId=TestProject_TestBuild")
        self.assertEquals(events[0]['tags'], ['build', 'one:tag', 'one:test'])
        self.assertEquals(events[0]['host'], "buildhost42.dtdg.co")


        # One more check should not create any more events
        with patch('requests.get', get_mock_one_more_build):
            check.check(check.instances[0])

        events = check.get_events()
        self.assertEquals(len(events), 0)
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:35,代码来源:test_teamcity.py


示例9: test_redis_repl

    def test_redis_repl(self):
        master_instance = {
            'host': 'localhost',
            'port': NOAUTH_PORT
        }

        slave_instance = {
            'host': 'localhost',
            'port': AUTH_PORT,
            'password': 'datadog-is-devops-best-friend'
        }

        repl_metrics = [
            'redis.replication.delay',
            'redis.replication.backlog_histlen',
            'redis.replication.delay',
            'redis.replication.master_repl_offset',
        ]

        master_db = redis.Redis(port=NOAUTH_PORT, db=14)
        slave_db = redis.Redis(port=AUTH_PORT, password=slave_instance['password'], db=14)
        master_db.flushdb()

        # Assert that the replication works
        master_db.set('replicated:test', 'true')
        self.assertEquals(slave_db.get('replicated:test'), 'true')

        r = load_check('redisdb', {}, {})
        r.check(master_instance)
        metrics = self._sort_metrics(r.get_metrics())

        # Assert the presence of replication metrics
        keys = [m[0] for m in metrics]
        assert [x in keys for x in repl_metrics]
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:34,代码来源:test_redis.py


示例10: test_nginx_plus

 def test_nginx_plus(self):
     test_data = read_data_from_file('nginx_plus_in.json')
     expected = eval(read_data_from_file('nginx_plus_out.python'))
     nginx = load_check('nginx', self.config, self.agent_config)
     parsed = nginx.parse_json(test_data)
     parsed.sort()
     self.assertEquals(parsed, expected)
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:7,代码来源:test_nginx.py


示例11: test_check_real_process

    def test_check_real_process(self):
        "Check that we detect python running (at least this process)"
        config = {
            'instances': [{"name": "py",
                           "search_string": ["python"],
                           "exact_match": False,
                           "ignored_denied_access": True,
                           "thresholds": {"warning": [1, 10], "critical": [1, 100]},
                       }]
        }
        
        self.agentConfig = {
            'version': '0.1',
            'api_key': 'toto'
        }

        self.check = load_check('process', config, self.agentConfig)
        self.check.check(config['instances'][0])
        python_metrics = self.check.get_metrics()
        service_checks = self.check.get_service_checks()
        assert service_checks
        self.assertTrue(len(python_metrics) > 0)
        # system.process.number >= 1
        self.assertTrue([m[2] for m in python_metrics if m[0] == "system.process.number"] >= 1)
        self.assertTrue(len([t for t in service_checks if t['status']== AgentCheck.OK]) > 0, service_checks)
        self.assertEquals(len([t for t in service_checks if t['status']== AgentCheck.WARNING]),  0, service_checks)
        self.assertEquals(len([t for t in service_checks if t['status']== AgentCheck.CRITICAL]), 0, service_checks)
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:27,代码来源:test_process.py


示例12: test_check

    def test_check(self):
        config = {
            'init_config': {},
            'instances': []
        }
        self.agentConfig = {
            'version': '0.1',
            'api_key': 'toto'
        }

        self.check = load_check('process', config, self.agentConfig)

        config = self.build_config(config)
        self.check.find_pids = self.find_pids

        for i in self.nb_procs:
            for j in range(len(config['instances'])):
                self.check.check(config['instances'][j])

            self.offset += 1

        service_checks = self.check.get_service_checks()

        assert service_checks

        self.assertTrue(type(service_checks) == type([]))
        self.assertTrue(len(service_checks) > 0)
        self.assertEquals(len([t for t in service_checks
            if t['status']== 0]), 12, service_checks)
        self.assertEquals(len([t for t in service_checks
            if t['status']== 1]), 6, service_checks)
        self.assertEquals(len([t for t in service_checks
            if t['status']== 2]), 22, service_checks)
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:33,代码来源:test_process.py


示例13: test_service_checks

    def test_service_checks(self):
        config = {
            'instances': [
                {'host': '127.0.0.1', 'port': 4730},
                {'host': '127.0.0.1', 'port': 4731}]
        }
        agentConfig = {
            'version': '0.1',
            'api_key': 'toto'
        }

        self.check = load_check('gearmand', config, agentConfig)
        self.check.check(config['instances'][0])
        self.assertRaises(Exception, self.check.check, config['instances'][1])

        service_checks = self.check.get_service_checks()
        self.assertEqual(len(service_checks), 2)

        ok_svc_check = service_checks[0]
        self.assertEqual(ok_svc_check['check'], self.check.SERVICE_CHECK_NAME)
        self.assertEqual(ok_svc_check['status'], AgentCheck.OK)

        cr_svc_check = service_checks[1]
        self.assertEqual(cr_svc_check['check'], self.check.SERVICE_CHECK_NAME)
        self.assertEqual(cr_svc_check['status'], AgentCheck.CRITICAL)
开发者ID:jonathonwiebe,项目名称:dd-agent,代码行数:25,代码来源:test_gearman.py


示例14: testTomcatMetrics

    def testTomcatMetrics(self):
        raise SkipTest()
        agentConfig = {
            'tomcat_jmx_instance_1': 'localhost:8090:first_instance',
            'tomcat_jmx_instance_2': 'dummyurl:4444:fake_url',
            'tomcat_jmx_instance_3': 'monitorRole:[email protected]:8091:second_instance_with_auth',
            'version': '0.1',
            'api_key': 'toto'
        }

        config = JmxCheck.parse_agent_config(agentConfig, 'tomcat')
        config['init_config'] = TOMCAT_CONFIG
        metrics_check = load_check('tomcat', config, agentConfig)
        

        tomcat6 = '/tmp/apache-tomcat-6/bin'
        tomcat7 = '/tmp/apache-tomcat-7/bin'
        self.start_tomcat(tomcat6, 8080)
        self.start_tomcat(tomcat7, 7070)

        timers_first_check = []

        for instance in config['instances']:
            try:
                start = time.time()
                metrics_check.check(instance)
                timers_first_check.append(time.time() - start)
            except Exception,e:
                print e
                continue
开发者ID:ovesh,项目名称:dd-agent,代码行数:30,代码来源:test_jmx.py


示例15: testJavaMetric

    def testJavaMetric(self):
        raise SkipTest()
        agentConfig = {
            'java_jmx_instance_1': 'localhost:8090',
            'java_jmx_instance_2': 'dummyhost:9999:dummy',
            'java_jmx_instance_3': 'localhost:2222:second_instance',
            'version': '0.1',
            'api_key': 'toto'
        }

        config = JmxCheck.parse_agent_config(agentConfig, 'java')

        metrics_check = load_check('jmx', config, agentConfig)

        # Starting tomcat
        tomcat6 = '/tmp/apache-tomcat-6/bin'
        self.start_tomcat(tomcat6, 8080)

        # Starting solr
        jmx_prefix = "-Dcom.sun.management.jmxremote"
        first_instance = "%s.port=2222 %s.authenticate=false -Djetty.port=8380" % (jmx_prefix, jmx_prefix)
        first_instance = self.start_solr(first_instance, 8983)

        timers_first_check = []

        for instance in config['instances']:
            #print "processing instance %s" % instance
            try:
                start = time.time()
                metrics_check.check(instance)
                timers_first_check.append(time.time() - start)
            except Exception,e:
                print e
                continue
开发者ID:ovesh,项目名称:dd-agent,代码行数:34,代码来源:test_jmx.py


示例16: test_invalid_metric

    def test_invalid_metric(self):
        self.config = {
                "instances": [{
                    "ip_address": "localhost",
                    "port":161,
                    "community_string": "public",
                    "metrics": [{
                        "MIB": "IF-MIB",
                        "table": "ifTable",
                        "symbols": ["ifInOctets", "ifOutOctets"],
                    },{
                        "MIB": "IF-MIB",
                        "table": "noIdeaWhatIAmDoingHere",
                        "symbols": ["ifInOctets", "ifOutOctets"],
                    }]
                }]
            }

        self.check = load_check('snmp', self.config, self.agentConfig)

        # Make it fails faster
        self.check.RETRIES = 0
        self.check.TIMEOUT = 0.5

        # We expect: No symbol IF-MIB::noIdeaWhatIAmDoingHere
        self.assertRaises(Exception, self.check.check, self.config['instances'][0])

        # Service checks
        service_checks = self.check.get_service_checks()
        service_checks = [sc for sc in service_checks if sc['check'].startswith('snmp')]
        service_checks_count = len(service_checks)
        self.assertEquals(service_checks_count, 1, service_checks)
        for sc in service_checks:
            self.assertEquals(sc['status'], self.check.CRITICAL, sc)
            self.assertEquals(sc['tags'], ['snmp_device:localhost'], sc)
开发者ID:sirlantis,项目名称:dd-agent,代码行数:35,代码来源:test_snmp.py


示例17: testSNMPCheck

    def testSNMPCheck(self):

        self.check = load_check('snmp', self.config, self.agentConfig)

        self.check.check(self.config['instances'][0])
        metrics = self.check.get_metrics()

        # Assert that there is only the gauge metric because the counter is used
        # as a rate so we don't report with 1 point
        self.assertEqual(len(metrics), 1)
        self.assertEqual(metrics[0][0], 'snmp.tcpCurrEstab')

        # Sleep for 1 second so the rate interval >=1
        time.sleep(1)
        # Run the check again so we get the rate
        self.check.check(self.config['instances'][0])
        metrics = self.check.get_metrics()

        self.assertEqual(len(metrics) ,2)
        expected_metrics = ['snmp.udpDatagrams','snmp.tcpCurrEstab']
        for metric in expected_metrics:
            metric_present=False
            for result in metrics:
                if result[0] == metric:
                    metric_present = True
                    break
            self.assertTrue(metric_present)
开发者ID:hungld,项目名称:dd-agent,代码行数:27,代码来源:test_snmp.py


示例18: test_collector

    def test_collector(self):
        agentConfig = {
            'api_key': 'test_apikey',
            'check_timings': True,
            'collect_ec2_tags': True,
            'collect_instance_metadata': False,
            'version': 'test',
            'tags': '',
        }

        # Run a single checks.d check as part of the collector.
        redis_config = {
            "init_config": {},
            "instances": [{"host": "localhost", "port": 6379}]
        }
        checks = [load_check('redisdb', redis_config, agentConfig)]

        c = Collector(agentConfig, [], {}, get_hostname(agentConfig))
        payload = c.run({
            'initialized_checks': checks,
            'init_failed_checks': {}
        })
        metrics = payload['metrics']

        # Check that we got a timing metric for all checks.
        timing_metrics = [m for m in metrics
            if m[0] == 'datadog.agent.check_run_time']
        all_tags = []
        for metric in timing_metrics:
            all_tags.extend(metric[3]['tags'])
        for check in checks:
            tag = "check:%s" % check.name
            assert tag in all_tags, all_tags
开发者ID:Joeasaurus,项目名称:dd-agent,代码行数:33,代码来源:test_common.py


示例19: test_apache

    def test_apache(self):
        agent_config = {
            'version': '0.1',
            'api_key': 'toto'
        }
        config = {
            'init_config': {},
            'instances': [
                {
                    'apache_status_url': 'http://localhost:8080/server-status',
                    'tags': ['instance:first']
                },
                {
                    'apache_status_url': 'http://localhost:8080/server-status?auto',
                    'tags': ['instance:second']
                },
            ]
        }
        check = load_check('apache', config, agent_config)

        check.check(config['instances'][0])
        metrics = check.get_metrics()
        self.assertEquals(metrics[0][3].get('tags'), ['instance:first'])

        check.check(config['instances'][1])
        metrics = check.get_metrics()
        self.assertEquals(metrics[0][3].get('tags'), ['instance:second'])

        service_checks = check.get_service_checks()
        can_connect = [sc for sc in service_checks if sc['check'] == 'apache.can_connect']
        for i in range(len(can_connect)):
            self.assertEquals(set(can_connect[i]['tags']), set(['host:localhost', 'port:8080']), service_checks)
开发者ID:jonathonwiebe,项目名称:dd-agent,代码行数:32,代码来源:test_apache.py


示例20: test_table_SNMPCheck

    def test_table_SNMPCheck(self):
        self.config = {
                "instances": [{
                    "ip_address": "localhost",
                    "port":161,
                    "community_string": "public",
                    "metrics": [{
                        "MIB": "IF-MIB",
                        "table": "ifTable",
                        "symbols": ["ifInOctets", "ifOutOctets"],
                        "metric_tags": [{
                            "tag":"interface",
                            "column":"ifDescr"
                            }, {
                            "tag":"dumbindex",
                            "index":1
                            }]
                        }]
                    }]
                }

        self.check = load_check('snmp', self.config, self.agentConfig)

        self.check.check(self.config['instances'][0])
        # Sleep for 1 second so the rate interval >=1
        time.sleep(1)
        # Run the check again so that we get the rates
        self.check.check(self.config['instances'][0])
        metrics = self.check.get_metrics()
        # nb of metrics depends on the nb of interfaces on the test machine
        # so it's not possible to specify an excat number
        self.assertTrue(len(metrics)>0, "No metrics")
        for metric in metrics:
            self.assertTrue(metric[0] in ['snmp.ifInOctets', 'snmp.ifOutOctets'],
                            metric[0])
            tags = metric[3]['tags']
            # Assert that all the wanted tags are here
            self.assertEquals(len(tags), 3, tags)
            tag_group_expected = ["snmp_device", "dumbindex", "interface"]
            for tag in tags:
                tag_group = tag.split(":")[0]
                self.assertTrue(tag_group in tag_group_expected, tag_group)
                if tag_group == "interface":
                    interface_type = tag.split(":")[1]
                    try:
                        float(interface_type)
                    except ValueError:
                        pass
                    else:
                        self.fail("Tag discovered not pretty printed %s" % interface_type)

        # Service checks
        service_checks = self.check.get_service_checks()
        service_checks = [sc for sc in service_checks if sc['check'].startswith('snmp')]
        service_checks_count = len(service_checks)
        # We run the check twice
        self.assertEquals(service_checks_count, 2, service_checks)
        for sc in service_checks:
            self.assertEquals(sc['status'], self.check.OK, sc)
            self.assertEquals(sc['tags'], ['snmp_device:localhost'], sc)
开发者ID:sirlantis,项目名称:dd-agent,代码行数:60,代码来源:test_snmp.py



注:本文中的tests.common.load_check函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python common.load_fixture函数代码示例发布时间:2022-05-27
下一篇:
Python common.init_recorder_component函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap