本文整理汇总了Java中com.amazonaws.services.kinesis.model.ShardIteratorType类的典型用法代码示例。如果您正苦于以下问题:Java ShardIteratorType类的具体用法?Java ShardIteratorType怎么用?Java ShardIteratorType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ShardIteratorType类属于com.amazonaws.services.kinesis.model包,在下文中一共展示了ShardIteratorType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getShardIterator
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
public String getShardIterator(final String streamName, final String shardId,
final ShardIteratorType shardIteratorType,
final String startingSequenceNumber, final Instant timestamp)
throws TransientKinesisException {
final Date date = timestamp != null ? timestamp.toDate() : null;
return wrapExceptions(new Callable<String>() {
@Override
public String call() throws Exception {
return kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(streamName)
.withShardId(shardId)
.withShardIteratorType(shardIteratorType)
.withStartingSequenceNumber(startingSequenceNumber)
.withTimestamp(date)
).getShardIterator();
}
});
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SimplifiedKinesisClient.java
示例2: ShardCheckpoint
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
this.streamName = checkNotNull(streamName, "streamName");
this.shardId = checkNotNull(shardId, "shardId");
if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
checkNotNull(sequenceNumber,
"You must provide sequence number for AT_SEQUENCE_NUMBER"
+ " or AFTER_SEQUENCE_NUMBER");
} else {
checkArgument(sequenceNumber == null,
"Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
}
if (shardIteratorType == AT_TIMESTAMP) {
checkNotNull(timestamp,
"You must provide timestamp for AT_TIMESTAMP");
} else {
checkArgument(timestamp == null,
"Timestamp must be null for an iterator type other than AT_TIMESTAMP");
}
this.subSequenceNumber = subSequenceNumber;
this.sequenceNumber = sequenceNumber;
this.timestamp = timestamp;
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:ShardCheckpoint.java
示例3: shouldReturnIteratorStartingWithTimestamp
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
Instant timestamp = Instant.now();
given(kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.withTimestamp(timestamp.toDate())
)).willReturn(new GetShardIteratorResult()
.withShardIterator(SHARD_ITERATOR));
String stream = underTest.getShardIterator(STREAM, SHARD_1,
ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:SimplifiedKinesisClientTest.java
示例4: shouldHandleGetShardIteratorError
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
private void shouldHandleGetShardIteratorError(
Exception thrownException,
Class<? extends Exception> expectedExceptionClass) {
GetShardIteratorRequest request = new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.LATEST);
given(kinesis.getShardIterator(request)).willThrow(thrownException);
try {
underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
failBecauseExceptionWasNotThrown(expectedExceptionClass);
} catch (Exception e) {
assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
} finally {
reset(kinesis);
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:SimplifiedKinesisClientTest.java
示例5: getRecords
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
* <p>Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
*
* @param shardItr shard iterator to use
* @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
* @return get records result
* @throws InterruptedException
*/
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
GetRecordsResult getRecordsResult = null;
while (getRecordsResult == null) {
try {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
// Update millis behind latest so it gets reported by the millisBehindLatest gauge
shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);
shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
}
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:ShardConsumer.java
示例6: getRecords
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
* Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
*
* @param shardItr shard iterator to use
* @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
* @return get records result
* @throws InterruptedException
*/
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
GetRecordsResult getRecordsResult = null;
while (getRecordsResult == null) {
try {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);
shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
}
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:ShardConsumer.java
示例7: setup
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Before
public void setup() throws Exception {
KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
endpoint.setAmazonKinesisClient(kinesisClient);
endpoint.setIteratorType(ShardIteratorType.LATEST);
undertest = new KinesisConsumer(endpoint, processor);
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
);
when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
.thenReturn(new DescribeStreamResult()
.withStreamDescription(new StreamDescription()
.withShards(new Shard().withShardId("shardId"))
)
);
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(new GetShardIteratorResult()
.withShardIterator("shardIterator")
);
}
开发者ID:HydAu,项目名称:Camel,代码行数:23,代码来源:KinesisConsumerTest.java
示例8: itObtainsAShardIteratorOnFirstPollForSequenceNumber
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception {
undertest.getEndpoint().setSequenceNumber("12345");
undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
undertest.poll();
final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(kinesisClient).describeStream(describeStreamReqCap.capture());
assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName"));
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER"));
assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:21,代码来源:KinesisConsumerTest.java
示例9: allTheEndpointParams
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void allTheEndpointParams() throws Exception {
KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ "?amazonKinesisClient=#kinesisClient"
+ "&maxResultsPerRequest=101"
+ "&iteratorType=latest"
+ "&shardId=abc"
+ "&sequenceNumber=123"
);
assertThat(endpoint.getClient(), is(amazonKinesisClient));
assertThat(endpoint.getStreamName(), is("some_stream_name"));
assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST));
assertThat(endpoint.getMaxResultsPerRequest(), is(101));
assertThat(endpoint.getSequenceNumber(), is("123"));
assertThat(endpoint.getShardId(), is("abc"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:18,代码来源:KinesisEndpointTest.java
示例10: testAutoCreateStreamForNonExistingStream
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void testAutoCreateStreamForNonExistingStream() throws Exception {
KinesisTestBinder binder = getBinder();
DirectChannel output = createBindableChannel("output", new BindingProperties());
ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
Date testDate = new Date();
consumerProperties.getExtension()
.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime());
String testStreamName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties);
binding.unbind();
DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName);
String createdStreamName = streamResult.getStreamDescription().getStreamName();
int createdShards = streamResult.getStreamDescription().getShards().size();
String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus();
assertThat(createdStreamName).isEqualTo(testStreamName);
assertThat(createdShards)
.isEqualTo(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency());
assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString());
KinesisShardOffset shardOffset =
TestUtils.getPropertyValue(binding, "lifecycle.streamInitialSequence", KinesisShardOffset.class);
assertThat(shardOffset.getIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP);
assertThat(shardOffset.getTimestamp()).isEqualTo(testDate);
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-binder-aws-kinesis,代码行数:28,代码来源:KinesisBinderTests.java
示例11: start
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Override
public void start(Map<String, String> settings) {
this.config = new KinesisSourceConnectorConfig(settings);
this.kinesisClient = this.kinesisClientFactory.create(this.config);
this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId);
Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition);
GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
.withShardId(this.config.kinesisShardId)
.withStreamName(this.config.kinesisStreamName);
if (null != lastOffset && !lastOffset.isEmpty()) {
String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER);
log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber);
shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber);
} else {
log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId);
shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition);
}
GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest);
log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator());
this.recordsRequest = new GetRecordsRequest()
.withLimit(this.config.kinesisRecordLimit)
.withShardIterator(shardIteratorResult.getShardIterator());
this.recordConverter = new RecordConverter(this.config);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:32,代码来源:KinesisSourceTask.java
示例12: KinesisSourceConnectorConfig
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
public KinesisSourceConnectorConfig(Map<String, String> parsedConfig) {
super(config(), parsedConfig);
this.awsAccessKeyId = this.getString(AWS_ACCESS_KEY_ID_CONF);
this.awsSecretKeyId = this.getPassword(AWS_SECRET_KEY_ID_CONF).value();
this.kafkaTopic = this.getString(TOPIC_CONF);
this.kinesisStreamName = this.getString(STREAM_NAME_CONF);
this.kinesisPosition = ConfigUtils.getEnum(ShardIteratorType.class, this, KINESIS_POSISTION_CONF);
this.kinesisRegion = ConfigUtils.getEnum(Regions.class, this, KINESIS_REGION_CONF);
this.kinesisShardId = this.getString(KINESIS_SHARD_ID_CONF);
this.kinesisRecordLimit = this.getInt(KINESIS_RECORD_LIMIT_CONF);
this.kinesisEmptyRecordsBackoffMs = this.getLong(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF);
this.kinesisThroughputExceededBackoffMs = this.getLong(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:14,代码来源:KinesisSourceConnectorConfig.java
示例13: config
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
public static ConfigDef config() {
return new ConfigDef()
.define(AWS_ACCESS_KEY_ID_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, AWS_ACCESS_KEY_ID_DOC)
.define(AWS_SECRET_KEY_ID_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, AWS_SECRET_KEY_ID_DOC)
.define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(STREAM_NAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, STREAM_NAME_DOC)
.define(KINESIS_POSISTION_CONF, ConfigDef.Type.STRING, ShardIteratorType.TRIM_HORIZON.toString(), ValidEnum.of(ShardIteratorType.class), ConfigDef.Importance.MEDIUM, KINESIS_POSISTION_DOC)
.define(KINESIS_REGION_CONF, ConfigDef.Type.STRING, Regions.US_EAST_1.toString(), ValidEnum.of(Regions.class), ConfigDef.Importance.MEDIUM, KINESIS_REGION_DOC)
.define(KINESIS_SHARD_ID_CONF, ConfigDef.Type.STRING, ".*", ConfigDef.Importance.HIGH, KINESIS_SHARD_ID_DOC)
.define(KINESIS_RECORD_LIMIT_CONF, ConfigDef.Type.INT, 500, ConfigDef.Range.between(1, 10000), ConfigDef.Importance.MEDIUM, KINESIS_RECORD_LIMIT_DOC)
.define(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 5000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_EMPTY_RECORDS_BACKOFF_MS_DOC)
.define(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 10 * 1000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_DOC);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:14,代码来源:KinesisSourceConnectorConfig.java
示例14: sourceOffsets
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void sourceOffsets() throws InterruptedException {
final String SEQUENCE_NUMBER = "asdfasdfddsa";
Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
GetRecordsResult recordsResult = new GetRecordsResult()
.withNextShardIterator("dsfargadsfasdfasda")
.withRecords(TestData.record())
.withMillisBehindLatest(0L);
when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null.");
assertFalse(records.isEmpty(), "records should not be empty.");
verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap());
GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest()
.withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
.withShardId(this.config.kinesisShardId)
.withStreamName(this.config.kinesisStreamName)
.withStartingSequenceNumber(SEQUENCE_NUMBER);
verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:33,代码来源:KinesisSourceTaskTest.java
示例15: shouldReturnIteratorStartingWithSequenceNumber
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
given(kinesis.getShardIterator(new GetShardIteratorRequest()
.withStreamName(STREAM)
.withShardId(SHARD_1)
.withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.withStartingSequenceNumber(SEQUENCE_NUMBER)
)).willReturn(new GetShardIteratorResult()
.withShardIterator(SHARD_ITERATOR));
String stream = underTest.getShardIterator(STREAM, SHARD_1,
ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:SimplifiedKinesisClientTest.java
示例16: getShardIterator
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Override
public GetShardIteratorResult getShardIterator(
GetShardIteratorRequest getShardIteratorRequest) {
ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
getShardIteratorRequest.getShardIteratorType());
String shardIterator;
if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
} else {
throw new RuntimeException("Not implemented");
}
return new GetShardIteratorResult().withShardIterator(shardIterator);
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:AmazonKinesisMock.java
示例17: getShardIterator
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
/**
* {@inheritDoc}
*/
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
.withStreamName(shard.getStreamName())
.withShardId(shard.getShard().getShardId())
.withShardIteratorType(shardIteratorType);
switch (ShardIteratorType.fromValue(shardIteratorType)) {
case TRIM_HORIZON:
case LATEST:
break;
case AT_TIMESTAMP:
if (startingMarker instanceof Date) {
getShardIteratorRequest.setTimestamp((Date) startingMarker);
} else {
throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
}
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
if (startingMarker instanceof String) {
getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
} else {
throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
}
}
return getShardIterator(getShardIteratorRequest);
}
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:KinesisProxy.java
示例18: doStart
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Override
protected void doStart() throws Exception {
if ((iteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && sequenceNumber.isEmpty()) {
throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
}
super.doStart();
}
开发者ID:HydAu,项目名称:Camel,代码行数:8,代码来源:KinesisEndpoint.java
示例19: onlyRequiredEndpointParams
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void onlyRequiredEndpointParams() throws Exception {
KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ "?amazonKinesisClient=#kinesisClient"
);
assertThat(endpoint.getClient(), is(amazonKinesisClient));
assertThat(endpoint.getStreamName(), is("some_stream_name"));
assertThat(endpoint.getIteratorType(), is(ShardIteratorType.TRIM_HORIZON));
assertThat(endpoint.getMaxResultsPerRequest(), is(1));
}
开发者ID:HydAu,项目名称:Camel,代码行数:12,代码来源:KinesisEndpointTest.java
示例20: afterSequenceNumberRequiresSequenceNumber
import com.amazonaws.services.kinesis.model.ShardIteratorType; //导入依赖的package包/类
@Test
public void afterSequenceNumberRequiresSequenceNumber() throws Exception {
KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
+ "?amazonKinesisClient=#kinesisClient"
+ "&iteratorType=AFTER_SEQUENCE_NUMBER"
+ "&shardId=abc"
+ "&sequenceNumber=123"
);
assertThat(endpoint.getClient(), is(amazonKinesisClient));
assertThat(endpoint.getStreamName(), is("some_stream_name"));
assertThat(endpoint.getIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER));
assertThat(endpoint.getShardId(), is("abc"));
assertThat(endpoint.getSequenceNumber(), is("123"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:16,代码来源:KinesisEndpointTest.java
注:本文中的com.amazonaws.services.kinesis.model.ShardIteratorType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论