• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MetricsReporter类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.metrics.MetricsReporter的典型用法代码示例。如果您正苦于以下问题:Java MetricsReporter类的具体用法?Java MetricsReporter怎么用?Java MetricsReporter使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MetricsReporter类属于org.apache.kafka.common.metrics包,在下文中一共展示了MetricsReporter类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: close

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public void close() throws IOException {
    try {
        kafkaClient.close();
    } finally {
        for (MetricsReporter metricsReporter: this.reporters) {
            metricsReporter.close();
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:10,代码来源:StreamsKafkaClient.java


示例2: MockProcessorContext

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public MockProcessorContext(final File stateDir,
                            final Serde<?> keySerde,
                            final Serde<?> valSerde,
                            final RecordCollector.Supplier collectorSupplier,
                            final ThreadCache cache) {
    this.stateDir = stateDir;
    this.keySerde = keySerde;
    this.valSerde = valSerde;
    recordCollectorSupplier = collectorSupplier;
    metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter) new JmxReporter()), new MockTime(), true);
    this.cache = cache;
    streamsMetrics = new MockStreamsMetrics(metrics);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProcessorContext.java


示例3: testUnused

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
@Test
public void testUnused() {
    Properties props = new Properties();
    String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
    props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
    props.put(FakeMetricsReporterConfig.EXTRA_CONFIG, "my_value");
    TestConfig config = new TestConfig(props);

    assertTrue("metric.extra_config should be marked unused before getConfiguredInstances is called",
        config.unused().contains(FakeMetricsReporterConfig.EXTRA_CONFIG));

    config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
    assertTrue("All defined configurations should be marked as used", config.unused().isEmpty());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:AbstractConfigTest.java


示例4: testValidInputs

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
private void testValidInputs(String configValue) {
    Properties props = new Properties();
    props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
    TestConfig config = new TestConfig(props);
    try {
        config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
    } catch (ConfigException e) {
        fail("No exceptions are expected here, valid props are :" + props);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:AbstractConfigTest.java


示例5: testInvalidInputs

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
private void testInvalidInputs(String configValue) {
    Properties props = new Properties();
    props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
    TestConfig config = new TestConfig(props);
    try {
        config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
        fail("Expected a config exception due to invalid props :" + props);
    } catch (KafkaException e) {
        // this is good
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:12,代码来源:AbstractConfigTest.java


示例6: KafkaMonitor

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public KafkaMonitor(Map<String, Map> testProps) throws Exception {
  _apps = new ConcurrentHashMap<>();
  _services = new ConcurrentHashMap<>();

  for (Map.Entry<String, Map> entry : testProps.entrySet()) {
    String name = entry.getKey();
    Map props = entry.getValue();
    if (!props.containsKey(CLASS_NAME_CONFIG))
      throw new IllegalArgumentException(name + " is not configured with " + CLASS_NAME_CONFIG);
    String className = (String) props.get(CLASS_NAME_CONFIG);

    Class<?> cls = Class.forName(className);
    if (App.class.isAssignableFrom(cls)) {
      App test = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
      _apps.put(name, test);
    } else if (Service.class.isAssignableFrom(cls)) {
      Service service = (Service) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
      _services.put(name, service);
    } else {
      throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
    }
  }
  _executor = Executors.newSingleThreadScheduledExecutor();
  _offlineRunnables = new ConcurrentHashMap<>();
  List<MetricsReporter> reporters = new ArrayList<>();
  reporters.add(new JmxReporter(JMX_PREFIX));
  Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
  metrics.addMetric(metrics.metricName("offline-runnable-count", METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
      new Measurable() {
        @Override
        public double measure(MetricConfig config, long now) {
          return _offlineRunnables.size();
        }
      }
  );
}
 
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:37,代码来源:KafkaMonitor.java


示例7: WorkerGroupMember

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public WorkerGroupMember(DistributedConfig config,
                         String restUrl,
                         ConfigBackingStore configStorage,
                         WorkerRebalanceListener listener,
                         Time time) {
    try {
        this.time = time;

        String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
        clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
        Map<String, String> metricsTags = new LinkedHashMap<>();
        metricsTags.put("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .tags(metricsTags);
        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time);
        this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
        this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
        String metricGrpPrefix = "connect";
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
        NetworkClient netClient = new NetworkClient(
                new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                this.metadata,
                clientId,
                100, // a fixed large enough value will suffice
                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
                time,
                true,
                new ApiVersions());
        this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
        this.coordinator = new WorkerCoordinator(this.client,
                config.getString(DistributedConfig.GROUP_ID_CONFIG),
                config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
                config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
                config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                metrics,
                metricGrpPrefix,
                this.time,
                retryBackoffMs,
                restUrl,
                configStorage,
                listener);

        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
        log.debug("Connect group member created");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        stop(true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:63,代码来源:WorkerGroupMember.java


示例8: StreamsKafkaClient

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public StreamsKafkaClient(final Config streamsConfig) {
    this.streamsConfig = streamsConfig;

    final Time time = new SystemTime();

    final Map<String, String> metricTags = new LinkedHashMap<>();
    metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);

    final Metadata metadata = new Metadata(streamsConfig.getLong(
        StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
        streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
        false
    );
    final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
    metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());

    final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
            .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
            .tags(metricTags);
    reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
            MetricsReporter.class);
    // TODO: This should come from the KafkaStream
    reporters.add(new JmxReporter("kafka.admin"));
    final Metrics metrics = new Metrics(metricConfig, reporters, time);

    final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);

    final Selector selector = new Selector(
        streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
        metrics,
        time,
        "kafka-client",
        channelBuilder);

    kafkaClient = new NetworkClient(
        selector,
        metadata,
        streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
        MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
        streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
        streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
        streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
        streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
        streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
        time,
        true,
        new ApiVersions());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:49,代码来源:StreamsKafkaClient.java


示例9: checkInstances

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
void checkInstances(Class<?> expectedClassPropClass, Class<?>... expectedListPropClasses) {
    assertEquals(expectedClassPropClass, getConfiguredInstance("class.prop", MetricsReporter.class).getClass());
    List<?> list = getConfiguredInstances("list.prop", MetricsReporter.class);
    for (int i = 0; i < list.size(); i++)
        assertEquals(expectedListPropClasses[i], list.get(i).getClass());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:7,代码来源:AbstractConfigTest.java


示例10: ConsumeService

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public ConsumeService(Map<String, Object> props, String name) throws Exception {
  _name = name;
  Map consumerPropsOverride = props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)
    ? (Map) props.get(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG) : new HashMap<>();
  ConsumeServiceConfig config = new ConsumeServiceConfig(props);
  String topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG);
  String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
  String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
  String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
  _latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG);
  _latencyPercentileMaxMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
  _latencyPercentileGranularityMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
  _running = new AtomicBoolean(false);

  for (String property: NONOVERRIDABLE_PROPERTIES) {
    if (consumerPropsOverride.containsKey(property)) {
      throw new ConfigException("Override must not contain " + property + " config.");
    }
  }

  Properties consumerProps = new Properties();

  // Assign default config. This has the lowest priority.
  consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "kmf-consumer");
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kmf-consumer-group-" + new Random().nextInt());
  consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

  if (consumerClassName.equals(NewConsumer.class.getCanonicalName()) || consumerClassName.equals(NewConsumer.class.getSimpleName())) {
    consumerClassName = NewConsumer.class.getCanonicalName();
  } else if (consumerClassName.equals(OldConsumer.class.getCanonicalName()) || consumerClassName.equals(OldConsumer.class.getSimpleName())) {
    consumerClassName = OldConsumer.class.getCanonicalName();
    // The name/value of these configs are changed in the new consumer.
    consumerProps.put("auto.commit.enable", "false");
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
  }

  // Assign config specified for ConsumeService.
  consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  consumerProps.put("zookeeper.connect", zkConnect);

  // Assign config specified for consumer. This has the highest priority.
  consumerProps.putAll(consumerPropsOverride);

  _consumer = (KMBaseConsumer) Class.forName(consumerClassName).getConstructor(String.class, Properties.class).newInstance(topic, consumerProps);

  _thread = new Thread(new Runnable() {
    @Override
    public void run() {
      try {
        consume();
      } catch (Exception e) {
        LOG.error(_name + "/ConsumeService failed", e);
      }
    }
  }, _name + " consume-service");
  _thread.setDaemon(true);

  MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
  List<MetricsReporter> reporters = new ArrayList<>();
  reporters.add(new JmxReporter(JMX_PREFIX));
  Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
  Map<String, String> tags = new HashMap<>();
  tags.put("name", _name);
  _sensors = new ConsumeMetrics(metrics, tags);
}
 
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:69,代码来源:ConsumeService.java


示例11: ProduceService

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public ProduceService(Map<String, Object> props, String name) throws Exception {
  _name = name;
  ProduceServiceConfig config = new ProduceServiceConfig(props);
  _zkConnect = config.getString(ProduceServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
  _brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
  String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);

  _partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class);
  _threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG);
  _topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG);
  _producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG);
  _produceDelayMs = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_DELAY_MS_CONFIG);
  _recordSize = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_SIZE_BYTE_CONFIG);
  _sync = config.getBoolean(ProduceServiceConfig.PRODUCE_SYNC_CONFIG);
  _treatZeroThroughputAsUnavailable = config.getBoolean(ProduceServiceConfig.PRODUCER_TREAT_ZERO_THROUGHPUT_AS_UNAVAILABLE_CONFIG);
  _partitionNum = new AtomicInteger(0);
  _running = new AtomicBoolean(false);
  _nextIndexPerPartition = new ConcurrentHashMap<>();
  _producerPropsOverride = props.containsKey(ProduceServiceConfig.PRODUCER_PROPS_CONFIG)
    ? (Map) props.get(ProduceServiceConfig.PRODUCER_PROPS_CONFIG) : new HashMap<>();

  for (String property: NONOVERRIDABLE_PROPERTIES) {
    if (_producerPropsOverride.containsKey(property)) {
      throw new ConfigException("Override must not contain " + property + " config.");
    }
  }

  if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) {
    _producerClassName = NewProducer.class.getCanonicalName();
  } else {
    _producerClassName = producerClass;
  }

  initializeProducer();

  _produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory());
  _handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory());

  MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
  List<MetricsReporter> reporters = new ArrayList<>();
  reporters.add(new JmxReporter(JMX_PREFIX));
  Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
  Map<String, String> tags = new HashMap<>();
  tags.put("name", _name);
  _sensors = new ProduceMetrics(metrics, tags);
}
 
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:47,代码来源:ProduceService.java


示例12: initialize

import org.apache.kafka.common.metrics.MetricsReporter; //导入依赖的package包/类
public static void initialize() {
  MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000, TimeUnit.MILLISECONDS);
  List<MetricsReporter> reporters = new ArrayList<>();
  reporters.add(new JmxReporter("io.confluent.ksql.metrics"));
  metrics = new Metrics(metricConfig, reporters, new SystemTime());
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:7,代码来源:MetricCollectors.java



注:本文中的org.apache.kafka.common.metrics.MetricsReporter类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Event类代码示例发布时间:2022-05-22
下一篇:
Java MapVariableResolverFactory类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap