本文整理汇总了Java中com.amazonaws.services.kinesis.model.GetShardIteratorRequest类的典型用法代码示例。如果您正苦于以下问题:Java GetShardIteratorRequest类的具体用法?Java GetShardIteratorRequest怎么用?Java GetShardIteratorRequest使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
GetShardIteratorRequest类属于com.amazonaws.services.kinesis.model包,在下文中一共展示了GetShardIteratorRequest类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getShardIterator
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
public String getShardIterator(final String streamName, final String shardId,
final ShardIteratorType shardIteratorType,
final String startingSequenceNumber, final Instant timestamp)
throws TransientKinesisException {
final Date date = timestamp != null ? timestamp.toDate() : null;
return wrapExceptions(new Callable<String>() {
@Override
public String call() throws Exception {
return kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(streamName)
.withShardId(shardId)
.withShardIteratorType(shardIteratorType)
.withStartingSequenceNumber(startingSequenceNumber)
.withTimestamp(date)
).getShardIterator();
}
});
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SimplifiedKinesisClient.java
示例2: shouldReturnIteratorStartingWithTimestamp
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
Instant timestamp = Instant.now();
given(kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.withTimestamp(timestamp.toDate())
)).willReturn(new GetShardIteratorResult()
.withShardIterator(SHARD_ITERATOR));
String stream = underTest.getShardIterator(STREAM, SHARD_1,
ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:SimplifiedKinesisClientTest.java
示例3: shouldHandleGetShardIteratorError
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
private void shouldHandleGetShardIteratorError(
Exception thrownException,
Class<? extends Exception> expectedExceptionClass) {
GetShardIteratorRequest request = new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.LATEST);
given(kinesis.getShardIterator(request)).willThrow(thrownException);
try {
underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
failBecauseExceptionWasNotThrown(expectedExceptionClass);
} catch (Exception e) {
assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
} finally {
reset(kinesis);
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SimplifiedKinesisClientTest.java
示例4: getShardIterator
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
int attempt = 0;
while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
try {
getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
}
}
}
if (getShardIteratorResult == null) {
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:KinesisProxy.java
示例5: setup
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Before
public void setup() throws Exception {
KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
endpoint.setAmazonKinesisClient(kinesisClient);
endpoint.setIteratorType(ShardIteratorType.LATEST);
undertest = new KinesisConsumer(endpoint, processor);
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
);
when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
.thenReturn(new DescribeStreamResult()
.withStreamDescription(new StreamDescription()
.withShards(new Shard().withShardId("shardId"))
)
);
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(new GetShardIteratorResult()
.withShardIterator("shardIterator")
);
}
开发者ID:HydAu,项目名称:Camel,代码行数:23,代码来源:KinesisConsumerTest.java
示例6: itObtainsAShardIteratorOnFirstPollForSequenceNumber
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception {
undertest.getEndpoint().setSequenceNumber("12345");
undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
undertest.poll();
final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(kinesisClient).describeStream(describeStreamReqCap.capture());
assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName"));
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER"));
assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:21,代码来源:KinesisConsumerTest.java
示例7: getPreviewRecords
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords(
ClientConfiguration awsClientConfig,
KinesisConfigBean conf,
int maxBatchSize,
GetShardIteratorRequest getShardIteratorRequest
) throws StageException {
AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf);
GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
String shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxBatchSize);
GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
return getRecordsResult.getRecords();
}
开发者ID:streamsets,项目名称:datacollector,代码行数:19,代码来源:KinesisUtil.java
示例8: start
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Override
public void start(Map<String, String> settings) {
this.config = new KinesisSourceConnectorConfig(settings);
this.kinesisClient = this.kinesisClientFactory.create(this.config);
this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId);
Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition);
GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
.withShardId(this.config.kinesisShardId)
.withStreamName(this.config.kinesisStreamName);
if (null != lastOffset && !lastOffset.isEmpty()) {
String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER);
log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber);
shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber);
} else {
log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId);
shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition);
}
GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest);
log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator());
this.recordsRequest = new GetRecordsRequest()
.withLimit(this.config.kinesisRecordLimit)
.withShardIterator(shardIteratorResult.getShardIterator());
this.recordConverter = new RecordConverter(this.config);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:32,代码来源:KinesisSourceTask.java
示例9: sourceOffsets
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void sourceOffsets() throws InterruptedException {
final String SEQUENCE_NUMBER = "asdfasdfddsa";
Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
GetRecordsResult recordsResult = new GetRecordsResult()
.withNextShardIterator("dsfargadsfasdfasda")
.withRecords(TestData.record())
.withMillisBehindLatest(0L);
when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null.");
assertFalse(records.isEmpty(), "records should not be empty.");
verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap());
GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest()
.withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
.withShardId(this.config.kinesisShardId)
.withStreamName(this.config.kinesisStreamName)
.withStartingSequenceNumber(SEQUENCE_NUMBER);
verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:33,代码来源:KinesisSourceTaskTest.java
示例10: shouldReturnIteratorStartingWithSequenceNumber
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
given(kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.withStartingSequenceNumber(SEQUENCE_NUMBER)
)).willReturn(new GetShardIteratorResult()
.withShardIterator(SHARD_ITERATOR));
String stream = underTest.getShardIterator(STREAM, SHARD_1,
ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:SimplifiedKinesisClientTest.java
示例11: getShardIterator
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Override
public GetShardIteratorResult getShardIterator(
GetShardIteratorRequest getShardIteratorRequest) {
ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
getShardIteratorRequest.getShardIteratorType());
String shardIterator;
if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
} else {
throw new RuntimeException("Not implemented");
}
return new GetShardIteratorResult().withShardIterator(shardIterator);
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:AmazonKinesisMock.java
示例12: getShardItertor
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
private String getShardItertor() {
// either return a cached one or get a new one via a GetShardIterator request.
if (currentShardIterator == null) {
String shardId;
//If ShardId supplied use it, else choose first one
if (!getEndpoint().getShardId().isEmpty()) {
shardId = getEndpoint().getShardId();
} else {
DescribeStreamRequest req1 = new DescribeStreamRequest()
.withStreamName(getEndpoint().getStreamName());
DescribeStreamResult res1 = getClient().describeStream(req1);
shardId = res1.getStreamDescription().getShards().get(0).getShardId();
}
LOG.debug("ShardId is: {}", shardId);
GetShardIteratorRequest req = new GetShardIteratorRequest()
.withStreamName(getEndpoint().getStreamName())
.withShardId(shardId)
.withShardIteratorType(getEndpoint().getIteratorType());
if (hasSequenceNumber()) {
req.withStartingSequenceNumber(getEndpoint().getSequenceNumber());
}
GetShardIteratorResult result = getClient().getShardIterator(req);
currentShardIterator = result.getShardIterator();
}
LOG.debug("Shard Iterator is: {}", currentShardIterator);
return currentShardIterator;
}
开发者ID:HydAu,项目名称:Camel,代码行数:32,代码来源:KinesisConsumer.java
示例13: itObtainsAShardIteratorOnFirstPoll
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
undertest.poll();
final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(kinesisClient).describeStream(describeStreamReqCap.capture());
assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName"));
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:16,代码来源:KinesisConsumerTest.java
示例14: itDoesNotMakeADescribeStreamRequestIfShardIdIsSet
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception {
undertest.getEndpoint().setShardId("shardIdPassedAsUrlParam");
undertest.poll();
verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class));
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardIdPassedAsUrlParam"));
assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:16,代码来源:KinesisConsumerTest.java
示例15: itUsesTheShardIteratorOnSubsiquentPolls
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Test
public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
undertest.poll();
undertest.poll();
final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class));
verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), is("shardIterator"));
assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), is("nextShardIterator"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:14,代码来源:KinesisConsumerTest.java
示例16: previewProcess
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
private void previewProcess(
int maxBatchSize,
BatchMaker batchMaker
) throws IOException, StageException {
ClientConfiguration awsClientConfig = AWSUtil.getClientConfiguration(conf.proxyConfig);
String shardId = KinesisUtil.getLastShardId(awsClientConfig, conf, conf.streamName);
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(conf.streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(conf.initialPositionInStream.name());
List<com.amazonaws.services.kinesis.model.Record> results = KinesisUtil.getPreviewRecords(
awsClientConfig,
conf,
Math.min(conf.maxBatchSize, maxBatchSize),
getShardIteratorRequest
);
int batchSize = results.size() > maxBatchSize ? maxBatchSize : results.size();
for (int index = 0; index < batchSize; index++) {
com.amazonaws.services.kinesis.model.Record record = results.get(index);
UserRecord userRecord = new UserRecord(record);
KinesisUtil.processKinesisRecord(
getShardIteratorRequest.getShardId(),
userRecord,
parserFactory
).forEach(batchMaker::addRecord);
}
}
开发者ID:streamsets,项目名称:datacollector,代码行数:33,代码来源:KinesisSource.java
示例17: getIterator
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
private void getIterator()
throws ResourceNotFoundException
{
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(split.getStreamName());
getShardIteratorRequest.setShardId(split.getShardId());
// Explanation: when we have a sequence number from a prior read or checkpoint, always use it.
// Otherwise, decide if starting at a timestamp or the trim horizon based on configuration.
// If starting at a timestamp, sue the session variable ITER_START_TIMESTAMP when given, otherwise
// fallback on starting at ITER_OFFSET_SECONDS from timestamp.
if (lastReadSeqNo == null) {
// Important: shard iterator type AT_TIMESTAMP requires 1.11.x or above of the AWS SDK.
if (SessionVariables.getIterFromTimestamp(session)) {
getShardIteratorRequest.setShardIteratorType("AT_TIMESTAMP");
long iterStartTs = SessionVariables.getIterStartTimestamp(session);
if (iterStartTs == 0) {
long startTs = System.currentTimeMillis() - (SessionVariables.getIterOffsetSeconds(session) * 1000);
getShardIteratorRequest.setTimestamp(new Date(startTs));
}
else {
getShardIteratorRequest.setTimestamp(new Date(iterStartTs));
}
}
else {
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
}
}
else {
getShardIteratorRequest.setShardIteratorType("AFTER_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(lastReadSeqNo);
}
GetShardIteratorResult getShardIteratorResult = clientManager.getClient().getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
}
开发者ID:qubole,项目名称:presto-kinesis,代码行数:37,代码来源:KinesisRecordSet.java
示例18: getShardIterator
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
@Override
public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws AmazonServiceException, AmazonClientException
{
ShardIterator iter = ShardIterator.fromStreamAndShard(getShardIteratorRequest.getStreamName(), getShardIteratorRequest.getShardId());
if (iter != null) {
InternalStream theStream = this.getStream(iter.streamId);
if (theStream != null) {
String seqAsString = getShardIteratorRequest.getStartingSequenceNumber();
if (seqAsString != null && !seqAsString.isEmpty() && getShardIteratorRequest.getShardIteratorType().equals("AFTER_SEQUENCE_NUMBER")) {
int sequence = Integer.parseInt(seqAsString);
iter.recordIndex = sequence + 1;
}
else {
iter.recordIndex = 100;
}
GetShardIteratorResult result = new GetShardIteratorResult();
return result.withShardIterator(iter.makeString());
}
else {
throw new AmazonClientException("Unknown stream or bad shard iterator!");
}
}
else {
throw new AmazonClientException("Bad stream or shard iterator!");
}
}
开发者ID:qubole,项目名称:presto-kinesis,代码行数:28,代码来源:MockKinesisClient.java
示例19: main
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; //导入依赖的package包/类
public static void main(String[] args) {
AmazonKinesisClient kinesisClient = Helper.setupKinesisClient();
// Retrieve the Shards from a Stream
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName"));
DescribeStreamResult describeStreamResult;
List<Shard> shards = new ArrayList<>();
String lastShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(lastShardId);
describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (shards.size() > 0) {
lastShardId = shards.get(shards.size() - 1).getShardId();
}
} while (describeStreamResult.getStreamDescription().getHasMoreShards());
// Get Data from the Shards in a Stream
// Hard-coded to use only 1 shard
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName"));
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
// Continuously read data records from shard.
List<Record> records;
while (true) {
// Create new GetRecordsRequest with existing shardIterator.
// Set maximum records to return to 1000.
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(1000);
GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest);
// Put result into record list. Result may be empty.
records = result.getRecords();
// Print records
for (Record record : records) {
ByteBuffer byteBuffer = record.getData();
System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(),
new String(byteBuffer.array())));
}
try {
Thread.sleep(1000);
} catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
shardIterator = result.getNextShardIterator();
}
}
开发者ID:twitterdev,项目名称:twttr-kinesis,代码行数:61,代码来源:TweetProcessor.java
注:本文中的com.amazonaws.services.kinesis.model.GetShardIteratorRequest类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论