本文整理汇总了Java中com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException类的典型用法代码示例。如果您正苦于以下问题:Java ProvisionedThroughputExceededException类的具体用法?Java ProvisionedThroughputExceededException怎么用?Java ProvisionedThroughputExceededException使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ProvisionedThroughputExceededException类属于com.amazonaws.services.kinesis.model包,在下文中一共展示了ProvisionedThroughputExceededException类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: sentWatermark
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
private void sentWatermark() {
try {
//refresh the list of available shards, if current state is too old
if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) {
refreshShards();
lastShardRefreshTime = System.currentTimeMillis();
}
//send a watermark to every shard of the Kinesis stream
shards.parallelStream()
.map(shard -> new PutRecordRequest()
.withStreamName(streamName)
.withData(new WatermarkEvent(currentWatermark).payload)
.withPartitionKey("23")
.withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey()))
.map(kinesisClient::putRecord)
.forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId()));
LOG.debug("send watermark {}", new DateTime(currentWatermark));
} catch (LimitExceededException | ProvisionedThroughputExceededException e) {
//if any request is throttled, just wait for the next iteration to submit another watermark
LOG.warn("skipping watermark due to limit exceeded exception");
}
}
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:26,代码来源:WatermarkTracker.java
示例2: throughputExceeded
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Test
public void throughputExceeded() throws InterruptedException {
final String SEQUENCE_NUMBER = "asdfasdfddsa";
Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
when(this.kinesisClient.getRecords(any())).thenThrow(new ProvisionedThroughputExceededException(""));
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null");
assertTrue(records.isEmpty(), "records should be empty.");
verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisThroughputExceededBackoffMs);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:18,代码来源:KinesisSourceTaskTest.java
示例3: isRecoverableException
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
/**
* Determines whether the exception is recoverable using exponential-backoff.
*
* @param ex Exception to inspect
* @return <code>true</code> if the exception can be recovered from, else
* <code>false</code>
*/
protected static boolean isRecoverableException(AmazonServiceException ex) {
if (ex.getErrorType() == null) {
return false;
}
switch (ex.getErrorType()) {
case Client:
return ex instanceof ProvisionedThroughputExceededException;
case Service:
case Unknown:
return true;
default:
return false;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:KinesisProxy.java
示例4: getShardIterator
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
/**
* {@inheritDoc}
*/
@Override
public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
int attempt = 0;
while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
try {
getShardIteratorResult =
kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
} catch (ProvisionedThroughputExceededException ex) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ backoffMillis + " millis.");
Thread.sleep(backoffMillis);
}
}
if (getShardIteratorResult == null) {
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:KinesisProxy.java
示例5: run
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
private void run(final int events, final OutputFormat format,
final String streamName, final String region) throws Exception {
AmazonKinesis kinesisClient = new AmazonKinesisClient(
new DefaultAWSCredentialsProviderChain());
kinesisClient.setRegion(Region.getRegion(Regions.fromName(region)));
int count = 0;
SensorReading r = null;
do {
r = nextSensorReading(format);
try {
PutRecordRequest req = new PutRecordRequest()
.withPartitionKey("" + rand.nextLong())
.withStreamName(streamName)
.withData(ByteBuffer.wrap(r.toString().getBytes()));
kinesisClient.putRecord(req);
} catch (ProvisionedThroughputExceededException e) {
Thread.sleep(BACKOFF);
}
System.out.println(r);
count++;
} while (count < events);
}
开发者ID:awslabs,项目名称:amazon-kinesis-aggregators,代码行数:25,代码来源:SensorReadingProducer.java
示例6: poll
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records;
try {
GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest);
records = new ArrayList<>(recordsResult.getRecords().size());
log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId);
for (Record record : recordsResult.getRecords()) {
SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record);
records.add(sourceRecord);
}
log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator());
this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
} catch (ProvisionedThroughputExceededException ex) {
log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex);
this.time.sleep(this.config.kinesisThroughputExceededBackoffMs);
return new ArrayList<>();
}
if (records.isEmpty()) {
log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs);
this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs);
}
return records;
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:30,代码来源:KinesisSourceTask.java
示例7: getRecords
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
/**
* {@inheritDoc}
*/
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
GetRecordsResult getRecordsResult = null;
int attempt = 0;
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ backoffMillis + " millis.");
Thread.sleep(backoffMillis);
}
}
if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:KinesisProxy.java
示例8: shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
TransientKinesisException.class);
}
开发者ID:apache,项目名称:beam,代码行数:6,代码来源:SimplifiedKinesisClientTest.java
示例9: shouldHandleProvisionedThroughputExceededExceptionForShardListing
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
TransientKinesisException.class);
}
开发者ID:apache,项目名称:beam,代码行数:6,代码来源:SimplifiedKinesisClientTest.java
示例10: shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
shouldHandleGetBacklogBytesError(new ProvisionedThroughputExceededException(""),
TransientKinesisException.class);
}
开发者ID:apache,项目名称:beam,代码行数:6,代码来源:SimplifiedKinesisClientTest.java
示例11: testIsRecoverableExceptionWithProvisionedThroughputExceeded
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; //导入依赖的package包/类
@Test
public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf");
ex.setErrorType(ErrorType.Client);
assertTrue(KinesisProxy.isRecoverableException(ex));
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:KinesisProxyTest.java
注:本文中的com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论