• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MetricsRegistry类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.samza.metrics.MetricsRegistry的典型用法代码示例。如果您正苦于以下问题:Java MetricsRegistry类的具体用法?Java MetricsRegistry怎么用?Java MetricsRegistry使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MetricsRegistry类属于org.apache.samza.metrics包,在下文中一共展示了MetricsRegistry类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testStartFailsOnTopicCreationErrors

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test(expected = TopicAlreadyMarkedForDeletionException.class)
public void testStartFailsOnTopicCreationErrors() {

  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  // create an admin that throws an exception during createStream
  SystemAdmin mockAdmin = newAdmin("0", "10");
  doThrow(new TopicAlreadyMarkedForDeletionException("invalid stream")).when(mockAdmin).createStream(checkpointSpec);

  SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());

  // expect an exception during startup
  checkpointManager.start();
}
 
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestKafkaCheckpointManagerJava.java


示例2: testStartFailsOnTopicValidationErrors

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test(expected = StreamValidationException.class)
public void testStartFailsOnTopicValidationErrors() {

  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);

  // create an admin that throws an exception during validateStream
  SystemAdmin mockAdmin = newAdmin("0", "10");
  doThrow(new StreamValidationException("invalid stream")).when(mockAdmin).validateStream(checkpointSpec);

  SystemFactory factory = newFactory(mock(SystemProducer.class), mock(SystemConsumer.class), mockAdmin);
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mock(Config.class), mock(MetricsRegistry.class), null, new KafkaCheckpointLogKeySerde());

  // expect an exception during startup
  checkpointManager.start();
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestKafkaCheckpointManagerJava.java


示例3: testReadFailsOnSerdeExceptions

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
  SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);

  SystemAdmin mockAdmin = newAdmin("0", "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  // wire up an exception throwing serde with the checkpointmanager
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // expect an exception from ExceptionThrowingSerde
  checkpointManager.readLastCheckpoint(TASK1);
}
 
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestKafkaCheckpointManagerJava.java


示例4: testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
  SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);

  SystemAdmin mockAdmin = newAdmin("0", "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  // wire up an exception throwing serde with the checkpointmanager
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
      new ExceptionThrowingCheckpointKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // expect the read to succeed inspite of the exception from ExceptionThrowingSerde
  checkpointManager.readLastCheckpoint(TASK1);
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestKafkaCheckpointManagerJava.java


示例5: EventHubSystemProducer

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public EventHubSystemProducer(EventHubConfig config, String systemName,
                              EventHubClientManagerFactory eventHubClientManagerFactory,
                              Map<String, Interceptor> interceptors, MetricsRegistry registry) {
  this.config = config;
  this.registry = registry;
  this.systemName = systemName;
  this.partitioningMethod = config.getPartitioningMethod(systemName);
  this.interceptors = interceptors;

  // Fetches the stream ids
  List<String> streamIds = config.getStreams(systemName);

  // Create and initiate connections to Event Hubs
  for (String streamId : streamIds) {
    EventHubClientManager ehClient = eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config);
    eventHubClients.put(streamId, ehClient);
    ehClient.init();
  }
}
 
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:EventHubSystemProducer.java


示例6: ZkJobCoordinator

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
  this.config = config;

  this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);

  this.processorId = createProcessorId(config);
  this.zkUtils = zkUtils;
  // setup a listener for a session state change
  // we are mostly interested in "session closed" and "new session created" events
  zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
  LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
  leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
  this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
  this.barrier =  new ZkBarrierForVersionUpgrade(
      zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
      zkUtils,
      new ZkBarrierListenerImpl());
  this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
  this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
  debounceTimer = new ScheduleAfterDebounceTime();
  debounceTimer.setScheduledTaskCallback(throwable -> {
      LOG.error("Received exception from in JobCoordinator Processing!", throwable);
      stop();
    });
}
 
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:ZkJobCoordinator.java


示例7: StreamPartitionCountMonitor

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
/**
 * Default constructor.
 *
 * @param streamsToMonitor  a set of SystemStreams to monitor.
 * @param metadataCache     the metadata cache which will be used to fetch metadata for partition counts.
 * @param metrics           the metrics registry to which the metrics should be added.
 * @param monitorPeriodMs   the period at which the monitor will run in milliseconds.
 * @param monitorCallback   the callback method to be invoked when partition count changes are detected
 */
public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
    MetricsRegistry metrics, int monitorPeriodMs, Callback monitorCallback) {
  this.streamsToMonitor = streamsToMonitor;
  this.metadataCache = metadataCache;
  this.monitorPeriodMs = monitorPeriodMs;
  this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
  this.callbackMethod = monitorCallback;

  // Pre-populate the gauges
  Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
  for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
    SystemStream systemStream = metadataEntry.getKey();
    Gauge gauge = metrics.newGauge("job-coordinator",
        String.format("%s-%s-partitionCount", systemStream.getSystem(), systemStream.getStream()), 0);
    mutableGauges.put(systemStream, gauge);
  }
  gauges = Collections.unmodifiableMap(mutableGauges);
}
 
开发者ID:apache,项目名称:samza,代码行数:28,代码来源:StreamPartitionCountMonitor.java


示例8: getConsumer

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
/**
 * Returns a consumer that sends all configs to the coordinator stream.
 *
 * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
 *               The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
 *               ch:source:taskname -> changelogPartition for changelog
 *               Everything else is processed as normal config
 */
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {

  if (useCachedConsumer && mockConsumer != null) {
    return mockConsumer;
  }

  String jobName = config.get("job.name");
  String jobId = config.get("job.id");
  if (jobName == null) {
    throw new ConfigException("Must define job.name.");
  }
  if (jobId == null) {
    jobId = "1";
  }
  String streamName = Util.getCoordinatorStreamName(jobName, jobId);
  SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
  mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
  return mockConsumer;
}
 
开发者ID:apache,项目名称:samza,代码行数:28,代码来源:MockCoordinatorStreamSystemFactory.java


示例9: testPartitionCountMonitorWithDurableStates

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithDurableStates()
    throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
  configMap.put("stores.mystore.changelog", "mychangelog");
  Config config = new MapConfig(configMap);

  // mimic job runner code to write the config to coordinator stream
  CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
  CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
  producer.writeConfig("test-job", config);

  ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);

  // change the input system stream metadata
  MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());

  StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
  monitor.updatePartitionCountMetric();
  assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED);
}
 
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestClusterBasedJobCoordinator.java


示例10: testPartitionCountMonitorWithoutDurableStates

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
  Config config = new MapConfig(configMap);

  // mimic job runner code to write the config to coordinator stream
  CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
  CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
  producer.writeConfig("test-job", config);

  ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);

  // change the input system stream metadata
  MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());

  StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
  monitor.updatePartitionCountMetric();
  assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestClusterBasedJobCoordinator.java


示例11: getConsumer

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
  return new SystemConsumer() {
    public void start() {
    }

    public void stop() {
    }

    public void register(SystemStreamPartition systemStreamPartition, String offset) {
      MSG_QUEUES.putIfAbsent(systemStreamPartition, new ArrayList<>());
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> retQueues = new HashMap<>();
      systemStreamPartitions.forEach(ssp -> {
          List<IncomingMessageEnvelope> msgs = MSG_QUEUES.get(ssp);
          if (msgs == null) {
            retQueues.put(ssp, new ArrayList<>());
          } else {
            retQueues.put(ssp, MSG_QUEUES.remove(ssp));
          }
        });
      return retQueues;
    }
  };
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:MockSystemFactory.java


示例12: TranquilitySystemProducerMetrics

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public TranquilitySystemProducerMetrics(String systemName, MetricsRegistry registry) {
  group = this.getClass().getName();
  this.registry = registry;
  this.systemName = systemName;

  MetricAdaptor adaptor = new MetricAdaptor(new MetricRegistry(), registry, group);

  bulkSendSuccess = newCounter("bulk-send-success");
  bulkSendBatchSize = newHistogram(adaptor, "bulk-send-batch-size");
  bulkSendWaitMs = newHistogram(adaptor, "bulk-send-wait-ms");
  triggerFlushCmd = newCounter("bulk-send-trigger-flush-cmd");
  triggerMaxRecords = newCounter("bulk-send-trigger-max-records");
  triggerMaxInterval = newCounter("bulk-send-trigger-max-interval");
  lagFromReceiveMs = newHistogram(adaptor, "lag-from-receive-ms");
  lagFromOriginMs = newHistogram(adaptor, "lag-from-origin-ms");
  sent = newCounter("sent");
  dropped = newCounter("dropped");
}
 
开发者ID:quantiply,项目名称:rico,代码行数:19,代码来源:TranquilitySystemProducerMetrics.java


示例13: ElasticsearchSystemProducerMetrics

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) {
    group = this.getClass().getName();
    this.registry = registry;
    this.systemName = systemName;

    MetricAdaptor adaptor = new MetricAdaptor(new MetricRegistry(), registry, group);
    
    bulkSendSuccess = newCounter("bulk-send-success");
    bulkSendBatchSize = newHistogram(adaptor, "bulk-send-batch-size");
    bulkSendWaitMs = newHistogram(adaptor, "bulk-send-wait-ms");
    triggerFlushCmd = newCounter("bulk-send-trigger-flush-cmd");
    triggerMaxActions = newCounter("bulk-send-trigger-max-actions");
    triggerMaxInterval = newCounter("bulk-send-trigger-max-interval");
    lagFromReceiveMs = newHistogram(adaptor, "lag-from-receive-ms");
    lagFromOriginMs = newHistogram(adaptor, "lag-from-origin-ms");
    inserts = newCounter("inserts");
    updates = newCounter("updates");
    deletes = newCounter("deletes");
    conflicts = newCounter("version-conflicts");
}
 
开发者ID:quantiply,项目名称:rico,代码行数:21,代码来源:ElasticsearchSystemProducerMetrics.java


示例14: getConsumer

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Override
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
  String host = config.get("systems." + systemName + ".host");
  int port = config.getInt("systems." + systemName + ".port");
  WikipediaFeed feed = new WikipediaFeed(host, port);

  return new WikipediaConsumer(systemName, feed, registry);
}
 
开发者ID:yoloanalytics,项目名称:bigdata-swamp,代码行数:9,代码来源:WikipediaSystemFactory.java


示例15: BlockingEnvelopeMap

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
  this(metricsRegistry, new Clock() {
    public long currentTimeMillis() {
      return System.currentTimeMillis();
    }
  });
}
 
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:BlockingEnvelopeMap.java


示例16: BlockingEnvelopeMapMetrics

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
  this.group = group;
  this.metricsRegistry = metricsRegistry;
  this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
  this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
  this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
  this.pollCount = metricsRegistry.newCounter(group, "poll-count");
}
 
开发者ID:apache,项目名称:samza,代码行数:9,代码来源:BlockingEnvelopeMap.java


示例17: SamzaContainerContext

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
/**
 * An immutable context object that can passed to tasks to give them information
 * about the container in which they are executing.
 * @param id The id of the container.
 * @param config The job configuration.
 * @param taskNames The set of taskName keys for which this container is responsible.
 * @param metricsRegistry the {@link MetricsRegistry} for the container metrics
 */
public SamzaContainerContext(
    String id,
    Config config,
    Collection<TaskName> taskNames,
    MetricsRegistry metricsRegistry) {
  this.id = id;
  this.config = config;
  this.taskNames = Collections.unmodifiableCollection(taskNames);
  this.metricsRegistry = metricsRegistry;
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:SamzaContainerContext.java


示例18: testCheckpointsAreReadFromOldestOffset

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test
public void testCheckpointsAreReadFromOldestOffset() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  SystemConsumer mockConsumer = newConsumer(ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0"))));

  String oldestOffset = "0";
  SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);

  // 1. verify that consumer.register is called only during checkpointManager.start.
  // 2. verify that consumer.register is called with the oldest offset.
  // 3. verify that no other operation on the CheckpointManager re-invokes register since start offsets are set during
  // register
  verify(mockConsumer, times(0)).register(CHECKPOINT_SSP, oldestOffset);
  checkpointManager.start();
  verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);

  checkpointManager.readLastCheckpoint(TASK1);
  verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
}
 
开发者ID:apache,项目名称:samza,代码行数:31,代码来源:TestKafkaCheckpointManagerJava.java


示例19: testAllMessagesInTheLogAreRead

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
@Test
public void testAllMessagesInTheLogAreRead() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));

  int oldestOffset = 0;
  int newestOffset = 10;

  // mock out a consumer that returns ten checkpoint IMEs for the same ssp
  List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
  for(int offset = oldestOffset; offset <= newestOffset; offset++) {
    pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
  }

  // return one message at a time from each poll simulating a KafkaConsumer with max.poll.records = 1
  SystemConsumer mockConsumer = newConsumer(pollOutputs);
  SystemAdmin mockAdmin = newAdmin(Integer.toString(oldestOffset), Integer.toString(newestOffset));
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // check that all ten messages are read, and the checkpoint is the newest message
  Checkpoint checkpoint = checkpointManager.readLastCheckpoint(TASK1);
  Assert.assertEquals(checkpoint.getOffsets(), ImmutableMap.of(ssp, Integer.toString(newestOffset)));
}
 
开发者ID:apache,项目名称:samza,代码行数:33,代码来源:TestKafkaCheckpointManagerJava.java


示例20: newFactory

import org.apache.samza.metrics.MetricsRegistry; //导入依赖的package包/类
private SystemFactory newFactory(SystemProducer producer, SystemConsumer consumer, SystemAdmin admin) {
  SystemFactory factory = mock(SystemFactory.class);
  when(factory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(producer);
  when(factory.getConsumer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(consumer);
  when(factory.getAdmin(anyString(), any(Config.class))).thenReturn(admin);
  return factory;
}
 
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:TestKafkaCheckpointManagerJava.java



注:本文中的org.apache.samza.metrics.MetricsRegistry类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java StreamingResolution类代码示例发布时间:2022-05-22
下一篇:
Java XSSimpleType类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap