• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java UserRecordResult类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.amazonaws.services.kinesis.producer.UserRecordResult的典型用法代码示例。如果您正苦于以下问题:Java UserRecordResult类的具体用法?Java UserRecordResult怎么用?Java UserRecordResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



UserRecordResult类属于com.amazonaws.services.kinesis.producer包,在下文中一共展示了UserRecordResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: send

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
public void send(String event) throws UnsupportedEncodingException {
    byte[] bytes = event.getBytes("UTF-8");
    this.metrics.queueEvent(bytes.length);
    ByteBuffer data = ByteBuffer.wrap(bytes);
    String partitionKey = getPartitionKey(event);
    if (partitionKey != null) {
        ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data);
        Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
                    LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
                }
                LOGGER.error("Exception during put", t);
            }

            @Override
            public void onSuccess(UserRecordResult result) {
                metrics.ackEvent();
            }
        });
    }
}
 
开发者ID:monetate,项目名称:koupler,代码行数:25,代码来源:KinesisEventProducer.java


示例2: put

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> sinkRecords) {

	// If KinesisProducers cannot write to Kinesis Streams (because of
	// connectivity issues, access issues
	// or misconfigured shards we will pause consumption of messages till
	// backlog is cleared

	validateOutStandingRecords();

	String partitionKey;
	for (SinkRecord sinkRecord : sinkRecords) {

		ListenableFuture<UserRecordResult> f;
		// Kinesis does not allow empty partition key
		if (sinkRecord.key() != null && !sinkRecord.key().toString().trim().equals("")) {
			partitionKey = sinkRecord.key().toString().trim();
		} else {
			partitionKey = Integer.toString(sinkRecord.kafkaPartition());
		}

		if (singleKinesisProducerPerPartition)
			f = addUserRecord(producerMap.get(sinkRecord.kafkaPartition() + "@" + sinkRecord.topic()), streamName,
					partitionKey, usePartitionAsHashKey, sinkRecord);
		else
			f = addUserRecord(kinesisProducer, streamName, partitionKey, usePartitionAsHashKey, sinkRecord);

		Futures.addCallback(f, callback);

	}
}
 
开发者ID:awslabs,项目名称:kinesis-kafka-connector,代码行数:32,代码来源:AmazonKinesisSinkTask.java


示例3: addUserRecord

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
		boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

	// If configured use kafka partition key as explicit hash key
	// This will be useful when sending data from same partition into
	// same shard
	if (usePartitionAsHashKey)
		return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
				DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
	else
		return kp.addUserRecord(streamName, partitionKey,
				DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));

}
 
开发者ID:awslabs,项目名称:kinesis-kafka-connector,代码行数:15,代码来源:AmazonKinesisSinkTask.java


示例4: onSuccess

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void onSuccess(UserRecordResult result) {
    recordsCompleted.getAndIncrement();
    if (recordsCompleted.get() % NUMBER_OF_ZOMBIES == 0) {
        log.info(format("Records completed: %s; Shard: %s; SequenceNumber: %s.",
                     recordsCompleted.get(), result.getShardId(), result.getSequenceNumber()));
        
    }
}
 
开发者ID:capside,项目名称:aws-kinesis-zombies,代码行数:10,代码来源:Drone.java


示例5: putNewRecord

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SneakyThrows
public void putNewRecord(Zombie zombie) {        
    CoordinateUTM utm = zombie.getCurrentPosition();
    CoordinateLatLon latLon = Datum.WGS84.utmToLatLon(utm);
    ZombieLecture lect = new ZombieLecture(id, zombie.getId(), new Date(), latLon.getLat(), latLon.getLon());
    utm.setAccuracy(RADIOUS);
    String partitionKey = utm.getShortForm();
    String json = mapper.writeValueAsString(lect);
    ByteBuffer data = ByteBuffer.wrap(json.getBytes("UTF-8"));
    ListenableFuture<UserRecordResult> f
            = producer.addUserRecord(streamName, partitionKey, data);
    Futures.addCallback(f, this.recordSentCallback);
}
 
开发者ID:capside,项目名称:aws-kinesis-zombies,代码行数:14,代码来源:Drone.java


示例6: open

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void open(Configuration parameters) throws Exception {
	super.open(parameters);

	// check and pass the configuration properties
	KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

	producer = getKinesisProducer(producerConfig);
	callback = new FutureCallback<UserRecordResult>() {
		@Override
		public void onSuccess(UserRecordResult result) {
			if (!result.isSuccessful()) {
				if (failOnError) {
					// only remember the first thrown exception
					if (thrownException == null) {
						thrownException = new RuntimeException("Record was not sent successful");
					}
				} else {
					LOG.warn("Record was not sent successful");
				}
			}
		}

		@Override
		public void onFailure(Throwable t) {
			if (failOnError) {
				thrownException = t;
			} else {
				LOG.warn("An exception occurred while processing a record", t);
			}
		}
	};

	if (this.customPartitioner != null) {
		this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
	}

	LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:FlinkKinesisProducer.java


示例7: invoke

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void invoke(OUT value, Context context) throws Exception {
	if (this.producer == null) {
		throw new RuntimeException("Kinesis producer has been closed");
	}

	checkAndPropagateAsyncError();

	String stream = defaultStream;
	String partition = defaultPartition;

	ByteBuffer serialized = schema.serialize(value);

	// maybe set custom stream
	String customStream = schema.getTargetStream(value);
	if (customStream != null) {
		stream = customStream;
	}

	String explicitHashkey = null;
	// maybe set custom partition
	if (customPartitioner != null) {
		partition = customPartitioner.getPartitionId(value);
		explicitHashkey = customPartitioner.getExplicitHashKey(value);
	}

	if (stream == null) {
		if (failOnError) {
			throw new RuntimeException("No target stream set");
		} else {
			LOG.warn("No target stream set. Skipping record");
			return;
		}
	}

	ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
	Futures.addCallback(cb, callback);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:FlinkKinesisProducer.java


示例8: isAllRecordFuturesCompleted

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private boolean isAllRecordFuturesCompleted() {
	for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
		if (!future.isDone()) {
			return false;
		}
	}

	return true;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:FlinkKinesisProducerTest.java


示例9: getNumPendingRecordFutures

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private int getNumPendingRecordFutures() {
	int numPending = 0;

	for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
		if (!future.isDone()) {
			numPending++;
		}
	}

	return numPending;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKinesisProducerTest.java


示例10: getAndCheck

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private void getAndCheck(Future<UserRecordResult> future) throws StageException {
  try {
    UserRecordResult result = future.get();
    if (!result.isSuccessful()) {
      for (Attempt attempt : result.getAttempts()) {
        LOG.error("Failed to put record: {}", attempt.getErrorMessage());
      }
      throw new StageException(Errors.KINESIS_00, result.getAttempts().get(0).getErrorMessage());
    }
  } catch (InterruptedException | ExecutionException e) {
    LOG.error("Pipeline is shutting down.", e);
    // We should flush if we encounter an error.
    kinesisProducer.flushSync();
  }
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:16,代码来源:KinesisTarget.java


示例11: testRecordTooLarge

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testRecordTooLarge() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(
      KinesisDTarget.class,
      target
  ).setOnRecordError(OnRecordError.TO_ERROR).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  List<Record> records = new ArrayList<>(4);
  records.add(KinesisTestUtil.getTooLargeRecord());
  records.addAll(KinesisTestUtil.getProducerTestRecords(3));
  targetRunner.runWrite(records);

  // Verify we added 3 good records at the end of the batch but not the bad one
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));

  assertEquals(1, targetRunner.getErrorRecords().size());
  targetRunner.runDestroy();
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:41,代码来源:TestKinesisTarget.java


示例12: trackTimestamp

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/** Track the timestamp of the event for determining watermark values until it has been sent or dropped. */
public void trackTimestamp(ListenableFuture<UserRecordResult> f, TripEvent event) {
  Futures.addCallback(f, new RemoveTimestampCallback(event));
}
 
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:WatermarkTracker.java


示例13: onSuccess

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void onSuccess(UserRecordResult result) {
  removeEvent();
}
 
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:WatermarkTracker.java


示例14: testAsyncErrorRethrownAfterFlush

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/**
 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
 *
 * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
 * The test for that is covered in testAtLeastOnceProducer.
 */
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test(timeout = 10000)
public void testAsyncErrorRethrownAfterFlush() throws Throwable {
	final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());

	OneInputStreamOperatorTestHarness<String, Object> testHarness =
		new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));

	testHarness.open();

	testHarness.processElement(new StreamRecord<>("msg-1"));
	testHarness.processElement(new StreamRecord<>("msg-2"));
	testHarness.processElement(new StreamRecord<>("msg-3"));

	// only let the first record succeed for now
	UserRecordResult result = mock(UserRecordResult.class);
	when(result.isSuccessful()).thenReturn(true);
	producer.getPendingRecordFutures().get(0).set(result);

	CheckedThread snapshotThread = new CheckedThread() {
		@Override
		public void go() throws Exception {
			// this should block at first, since there are still two pending records that needs to be flushed
			testHarness.snapshot(123L, 123L);
		}
	};
	snapshotThread.start();

	// let the 2nd message fail with an async exception
	producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message"));
	producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));

	try {
		snapshotThread.sync();
	} catch (Exception e) {
		// after the flush, the async exception should have been rethrown
		Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent());

		// test succeeded
		return;
	}

	Assert.fail();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:52,代码来源:FlinkKinesisProducerTest.java


示例15: testAtLeastOnceProducer

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/**
 * Test ensuring that the producer is not dropping buffered records;
 * we set a timeout because the test will not finish if the logic is broken.
 */
@SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
	final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());

	OneInputStreamOperatorTestHarness<String, Object> testHarness =
		new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));

	testHarness.open();

	testHarness.processElement(new StreamRecord<>("msg-1"));
	testHarness.processElement(new StreamRecord<>("msg-2"));
	testHarness.processElement(new StreamRecord<>("msg-3"));

	// start a thread to perform checkpointing
	CheckedThread snapshotThread = new CheckedThread() {
		@Override
		public void go() throws Exception {
			// this should block until all records are flushed;
			// if the snapshot implementation returns before pending records are flushed,
			testHarness.snapshot(123L, 123L);
		}
	};
	snapshotThread.start();

	// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
	// this would block forever if the snapshot didn't perform a flush
	producer.waitUntilFlushStarted();
	Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

	// now, complete the callbacks
	UserRecordResult result = mock(UserRecordResult.class);
	when(result.isSuccessful()).thenReturn(true);

	producer.getPendingRecordFutures().get(0).set(result);
	Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

	producer.getPendingRecordFutures().get(1).set(result);
	Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());

	producer.getPendingRecordFutures().get(2).set(result);

	// this would fail with an exception if flushing wasn't completed before the snapshot method returned
	snapshotThread.sync();

	testHarness.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:52,代码来源:FlinkKinesisProducerTest.java


示例16: getPendingRecordFutures

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
	return pendingRecordFutures;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:FlinkKinesisProducerTest.java


示例17: open

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void open(Configuration parameters) throws Exception {
	super.open(parameters);

	KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();

	producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
	producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
	if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
		producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
				ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
	}
	if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
		producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
				ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
	}

	producer = new KinesisProducer(producerConfig);
	callback = new FutureCallback<UserRecordResult>() {
		@Override
		public void onSuccess(UserRecordResult result) {
			if (!result.isSuccessful()) {
				if(failOnError) {
					thrownException = new RuntimeException("Record was not sent successful");
				} else {
					LOG.warn("Record was not sent successful");
				}
			}
		}

		@Override
		public void onFailure(Throwable t) {
			if (failOnError) {
				thrownException = t;
			} else {
				LOG.warn("An exception occurred while processing a record", t);
			}
		}
	};

	if (this.customPartitioner != null) {
		this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
	}

	LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:FlinkKinesisProducer.java


示例18: invoke

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void invoke(OUT value) throws Exception {
	if (this.producer == null) {
		throw new RuntimeException("Kinesis producer has been closed");
	}
	if (thrownException != null) {
		String errorMessages = "";
		if (thrownException instanceof UserRecordFailedException) {
			List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
			for (Attempt attempt: attempts) {
				if (attempt.getErrorMessage() != null) {
					errorMessages += attempt.getErrorMessage() +"\n";
				}
			}
		}
		if (failOnError) {
			throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
		} else {
			LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
			thrownException = null; // reset
		}
	}

	String stream = defaultStream;
	String partition = defaultPartition;

	ByteBuffer serialized = schema.serialize(value);

	// maybe set custom stream
	String customStream = schema.getTargetStream(value);
	if (customStream != null) {
		stream = customStream;
	}

	String explicitHashkey = null;
	// maybe set custom partition
	if (customPartitioner != null) {
		partition = customPartitioner.getPartitionId(value);
		explicitHashkey = customPartitioner.getExplicitHashKey(value);
	}

	if (stream == null) {
		if (failOnError) {
			throw new RuntimeException("No target stream set");
		} else {
			LOG.warn("No target stream set. Skipping record");
			return;
		}
	}

	ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
	Futures.addCallback(cb, callback);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:54,代码来源:FlinkKinesisProducer.java


示例19: testInOrderProduce

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testInOrderProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();
  config.preserveOrdering = true;

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  PowerMockito.mockStatic(KinesisUtil.class);

  when(KinesisUtil.checkStreamExists(
      any(ClientConfiguration.class),
      any(KinesisConfigBean.class),
      any(String.class),
      any(List.class),
      any(Stage.Context.class)
      )
  ).thenReturn(1L);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);
  when(result.getShardId()).thenReturn("shardId-000000000000");

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records to stream test
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // With preserveOrdering we should call flushSync for each record, plus once more for the batch.
  // The last invocation has no effect as no records should be pending.
  verify(producer, times(4)).flushSync();

  targetRunner.runDestroy();
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:48,代码来源:TestKinesisTarget.java


示例20: testDefaultProduce

import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testDefaultProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // By default we should only call flushSync one time per batch.
  verify(producer, times(1)).flushSync();

  targetRunner.runDestroy();
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:36,代码来源:TestKinesisTarget.java



注:本文中的com.amazonaws.services.kinesis.producer.UserRecordResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java OFSwitchBase类代码示例发布时间:2022-05-23
下一篇:
Java ASN1SequenceParser类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap