• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java SendMessageBatchRequest类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.amazonaws.services.sqs.model.SendMessageBatchRequest的典型用法代码示例。如果您正苦于以下问题:Java SendMessageBatchRequest类的具体用法?Java SendMessageBatchRequest怎么用?Java SendMessageBatchRequest使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



SendMessageBatchRequest类属于com.amazonaws.services.sqs.model包,在下文中一共展示了SendMessageBatchRequest类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: createRequest

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
@VisibleForTesting
static SendMessageBatchRequest createRequest(String queueUrl, Map<String, SendMessageEntry> entries) {
    return new SendMessageBatchRequest()
            .withQueueUrl(queueUrl)
            .withEntries(entries.entrySet().stream().map(keyValue -> {
                        SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry()
                                .withId(keyValue.getKey())
                                .withMessageBody(keyValue.getValue().getBody());

                        keyValue.getValue().getDelay()
                                .ifPresent((delay) -> entry.setDelaySeconds((int) delay.getSeconds()));

                        return entry;
                    }).collect(Collectors.toList())
            );
}
 
开发者ID:Bandwidth,项目名称:async-sqs,代码行数:17,代码来源:SendMessageBatchAction.java


示例2: testCreateBatches

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests happy path scenario.
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 2 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 5
 * </pre>
 */
@Test
public void testCreateBatches() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(5, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:39,代码来源:BatchSQSMsgSenderTest.java


示例3: testCreateBatchesEventWithEmptyBody

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel returns event with empty body.
 * <p>
 * <pre>
 * Inputs:
 *  channel = 1 event with empty body
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 0
 * </pre>
 */
@Test
public void testCreateBatchesEventWithEmptyBody() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    Channel mockChannel = Mockito.mock(Channel.class);
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn("".getBytes());
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(0, batches.size());
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:30,代码来源:BatchSQSMsgSenderTest.java


示例4: testCreateBatchesEmptyChannelAfterFirstTake

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty after first event.
 * <p>
 * <pre>
 * Inputs:
 *  channel = 1 Event (Empty after first take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 1
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelAfterFirstTake() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent).thenReturn(null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(1, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:38,代码来源:BatchSQSMsgSenderTest.java


示例5: testCreateBatchesEmptyChannelAfterLastTake

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty after the last take for the batch.
 * <p>
 * <pre>
 * Inputs:
 *  channel = 5 Events (Empty after 5th take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 5
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelAfterLastTake() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(5, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:38,代码来源:BatchSQSMsgSenderTest.java


示例6: testCreateBatchesEmptyChannelInTheMiddle

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is empty in the middle of taking events for the batch
 * <p>
 * <pre>
 * Inputs:
 *  channel = 3 Events (Empty after 3rd take)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 3
 * </pre>
 */
@Test
public void testCreateBatchesEmptyChannelInTheMiddle() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(3, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:38,代码来源:BatchSQSMsgSenderTest.java


示例7: testCreateBatchesEmptyEventInTheMiddle

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * channel is not empty but contains events with empty body in the middle of taking events for the batch
 * <p>
 * <pre>
 * Inputs:
 *  channel = 4 Events (3 Events with Body and 4th Event empty)
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *
 * Expected Output:
 *  number of batches = 1
 *  number of messages in batch = 3
 * </pre>
 */
@Test
public void testCreateBatchesEmptyEventInTheMiddle() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    byte[] mockMsgPayload = {'A', 'b'};
    byte[] mockEmptyMsgPayload = {};
    Event mockEvent = Mockito.mock(Event.class);
    Event mockEmptyEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);
    when(mockEmptyEvent.getBody()).thenReturn(mockEmptyMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEmptyEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(1, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries);
    Assert.assertEquals(3, msgEntries.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:41,代码来源:BatchSQSMsgSenderTest.java


示例8: testSend

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the happy path scenario.
 *
 * @throws Exception
 */
@Test
public void testSend() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class);
    when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(new SendMessageBatchResult());
    sqsMsgSender.setAmazonSQS(mockSqs);

    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    sqsMsgSender.send(mockChannel);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:23,代码来源:BatchSQSMsgSenderTest.java


示例9: testSendFailureAmazonServiceException

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when AWS
 * SQS API throws AmazonServiceException.
 *
 * @throws Exception
 */
@Test(expected = EventDeliveryException.class)
public void testSendFailureAmazonServiceException() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class);
    when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(AmazonServiceException.class);
    sqsMsgSender.setAmazonSQS(mockSqs);

    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    sqsMsgSender.send(mockChannel);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:24,代码来源:BatchSQSMsgSenderTest.java


示例10: testSendFailureAmazonClientException

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when AWS
 * SQS API throws AmazonClientException.
 *
 * @throws Exception
 */
@Test(expected = EventDeliveryException.class)
public void testSendFailureAmazonClientException() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);
    AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class);
    when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(AmazonClientException.class);
    sqsMsgSender.setAmazonSQS(mockSqs);

    byte[] mockMsgPayload = {'A', 'b'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    sqsMsgSender.send(mockChannel);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:24,代码来源:BatchSQSMsgSenderTest.java


示例11: shouldSendBatchesInSizeOfTen

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
@Test
public void shouldSendBatchesInSizeOfTen() throws Exception {
    when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class));
    ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);

    messagePublisher.postBatch(messageBatch(10), subject);
    verify(mockAmazonSQS, times(1)).sendMessageBatch(captor.capture());
    assertThat(captor.getValue().getEntries()).hasSize(10);

    messagePublisher.postBatch(messageBatch(20), subject);
    verify(mockAmazonSQS, times(3)).sendMessageBatch(any(SendMessageBatchRequest.class));

    messagePublisher.postBatch(messageBatch(11), subject);
    verify(mockAmazonSQS, times(5)).sendMessageBatch(captor.capture());
    assertThat(captor.getValue().getEntries()).hasSize(1);

    messagePublisher.postBatch(messageBatch(9), subject);
    verify(mockAmazonSQS, times(6)).sendMessageBatch(captor.capture());
    assertThat(captor.getValue().getEntries()).hasSize(9);
}
 
开发者ID:iZettle,项目名称:izettle-toolbox,代码行数:21,代码来源:QueueServiceSenderTest.java


示例12: publishMessages

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
void publishMessages(List<Message> messages) {
	logger.info("Sending {} messages", messages.size());
	SendMessageBatchRequest batch = new SendMessageBatchRequest(queueURL);
	messages.stream().forEach(msg -> {
		SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(), msg.getPayload());
		batch.getEntries().add(sendr);
	});
	logger.info("sending {}", batch.getEntries().size());
	SendMessageBatchResult result = client.sendMessageBatch(batch);
	logger.info("send result {}", result.getFailed().toString());
}
 
开发者ID:Netflix,项目名称:conductor,代码行数:12,代码来源:SQSObservableQueue.java


示例13: send

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
@Override
public int send(Channel channel) throws EventDeliveryException {
    int eventProcessedCounter = 0;
    // Create batch request
    List<SendMessageBatchRequest> batchRequests = createBatches(channel);

    for (SendMessageBatchRequest batchRequest : batchRequests) {
        // Send batch request
        SendMessageBatchResult result = null;
        try {
            result = this.amazonSQS.sendMessageBatch(batchRequest);
        }
        catch (AmazonServiceException ase) {
            // Throw request reached to SQS but the whole batch was rejected for some reason. Let the whole batch
            // be treated as "failed". Flume will retry the while batch
            throw new EventDeliveryException("Failure sending batch message request to Amazon SQS, " +
                "the request made it to SQS but was rejected for some reason.", ase);
        }
        catch (AmazonClientException ace) {
            throw new EventDeliveryException("Failure sending batch message request to Amazon SQS.", ace);
        }

        // Handle the result of the SQS batch request i.e., log errors, or fail the whole batch by throwing
        // EventDeliveryException in case of errors etc.
        handleResult(batchRequest, result);

        // The code reached here means there is nothing to rollback in this transaction. So increment the
        // eventProcessedCounter by the number of successfully sent messages.
        eventProcessedCounter += result.getSuccessful().size();
    }
    return eventProcessedCounter;
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:33,代码来源:BatchSQSMsgSender.java


示例14: handleResult

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail
 * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS.
 * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed
 * to be delivered but some succeeded (i.e., partial batch failure).
 * <p>
 * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure
 *
 * @param batchRequest The SQS SendMessageBatchRequest
 * @param batchResult The SQS SendMessageBatchResult
 *
 * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS
 */
protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult)
    throws EventDeliveryException {

    List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries();
    List<BatchResultErrorEntry> errors = batchResult.getFailed();

    int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size();
    int errorCount = errors == null ? 0 : errors.size();

    if (errorCount > 0) {
        String errorMessage = buildErrorMessage(batchRequestEntries, errors);

        if (attemptedCount == errorCount) {
            // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole
            // batch and let flume rollback the transaction and retry it
            // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to
            // rollback.
            throw new EventDeliveryException(errorMessage);
        }
        else {
            // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure

            // Just log the error message and let flume drop failed messages in case of partial batch failures
            LOG.error(errorMessage);
        }
    }
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:41,代码来源:BatchSQSMsgSender.java


示例15: testInvalidCharacters

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests invalid characters not
 * allowed by the SQS. See [http://docs.aws.amazon
 * .com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html]
 * for list of valid characters allowed by SQS.
 * <p>
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty. with messages containing invalid characters.
 *
 * Expected Output:
 *   The sink messages should not contain invalid characters
 * </pre>
 */
@Test
public void testInvalidCharacters() throws Exception {
    // See
    // http://stackoverflow.com/questions/16688523/aws-sqs-valid-characters
    // http://stackoverflow.com/questions/1169754/amazon-sqs-invalid-binary-character-in-message-body
    // https://forums.aws.amazon.com/thread.jspa?messageID=459090
    // http://stackoverflow.com/questions/16329695/invalid-binary-character-when-transmitting-protobuf-net
    // -messages-over-aws-sqs
    byte invalidCharByte = 0x1C;
    String mockMsg = "Test with some invalid chars at the end 0%2F>^F";
    byte[] origPayloadWithInvalidChars = ArrayUtils.add(mockMsg.getBytes(), invalidCharByte);

    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 1,
            origPayloadWithInvalidChars.length);

    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(origPayloadWithInvalidChars);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries();
    assertCorrectPayloadInEntries(new String(origPayloadWithInvalidChars).trim().getBytes(), msgEntries);

    // Make sure that the message being sent by the sink doesn't contain the invalid characters
    for (SendMessageBatchRequestEntry entry : msgEntries) {
        Assert.assertNotNull(entry);
        Assert.assertTrue(ArrayUtils.contains(new String(origPayloadWithInvalidChars).getBytes(), invalidCharByte));
        Assert.assertTrue(!ArrayUtils.contains(entry.getMessageBody().getBytes(), invalidCharByte));
    }
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:50,代码来源:BatchSQSMsgSenderTest.java


示例16: testCreateBatchesExceedingSize

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * specified <i>batchSize</i> can not be fit into the specified <i>maxMessageSize</i>
 * <p>
 * <pre>
 * Inputs:
 *  channel = never empty
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 3 Bytes
 *
 * Expected Output:
 *  number of batches = 2
 *  number of messages in batch 1 = 3
 *  number of messages in batch 2 = 2
 * </pre>
 */
@Test
public void testCreateBatchesExceedingSize() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'A', 'b', '~'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(2, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries1);
    Assert.assertEquals(3, msgEntries1.size());

    List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
    Assert.assertNotNull(msgEntries2);
    Assert.assertEquals(2, msgEntries2.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:45,代码来源:BatchSQSMsgSenderTest.java


示例17: testCreateBatchesExceedingSizeLimitedChannel

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the
 * specified <i>batchSize</i> can not fit into the specified <i>maxMessageSize</i> and channel gets empty after
 * certain number of events "takes".
 * <p>
 * <pre>
 * Inputs:
 *  channel = 4 Events
 *  batchSize = 5
 *  maxMessageSize = 10 Bytes
 *  each message size = 3 Bytes
 *
 * Expected Output:
 *  number of batches = 2
 *  number of messages in batch 1 = 3
 *  number of messages in batch 2 = 1
 * </pre>
 */
@Test
public void testCreateBatchesExceedingSizeLimitedChannel() throws Exception {
    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10);

    byte[] mockMsgPayload = {'^', '@', '~'};
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, null);

    List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel);

    Assert.assertNotNull(batches);
    Assert.assertEquals(2, batches.size());

    List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries();
    Assert.assertNotNull(msgEntries1);
    Assert.assertEquals(3, msgEntries1.size());

    List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries();
    Assert.assertNotNull(msgEntries2);
    Assert.assertEquals(1, msgEntries2.size());

    assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:46,代码来源:BatchSQSMsgSenderTest.java


示例18: testSendPartialBatchFailure

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when
 * certain messages in the batch failed to be delivered to SQS.
 * <p>
 * <pre>
 * Expected:
 * - No EventDeliveryException is thrown
 * - The BatchSQSMsgSender returns successfully processed events count
 * </pre>
 *
 * @throws Exception
 */
@Test
public void testSendPartialBatchFailure() throws Exception {
    int batchSize = 5;
    int failedMsgCount = 1;
    int expectedSuccessCount = batchSize - failedMsgCount;

    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey",
            batchSize, 100);
    AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class);

    SendMessageBatchResult mockResult = mockBatchResult(batchSize, expectedSuccessCount);

    when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mockResult);
    sqsMsgSender.setAmazonSQS(mockSqs);

    String msgBody = "Some message payload";
    byte[] mockMsgPayload = msgBody.getBytes();
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    int successCount = sqsMsgSender.send(mockChannel);

    Assert.assertEquals(expectedSuccessCount, successCount);
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:41,代码来源:BatchSQSMsgSenderTest.java


示例19: testSendCompleteBatchFailure

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
/**
 * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when all
 * the messages in the batch failed to be delivered to SQS.
 * <p>
 * Expected: - EventDeliveryException is thrown - EventDeliveryException also contains the failed messages payload
 * in the exception message
 *
 * @throws Exception
 */
@Test(expected = EventDeliveryException.class)
public void testSendCompleteBatchFailure() throws Exception {
    int batchSize = 5;
    int failedMsgCount = batchSize;
    int expectedSuccessCount = batchSize - failedMsgCount;

    BatchSQSMsgSender sqsMsgSender =
        new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey",
            batchSize, 100);
    AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class);

    SendMessageBatchResult mockResult = mockBatchResult(batchSize, expectedSuccessCount);

    when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mockResult);
    sqsMsgSender.setAmazonSQS(mockSqs);

    String msgBody = "Some message payload";
    byte[] mockMsgPayload = msgBody.getBytes();
    Event mockEvent = Mockito.mock(Event.class);
    when(mockEvent.getBody()).thenReturn(mockMsgPayload);

    Channel mockChannel = Mockito.mock(Channel.class);
    when(mockChannel.take()).thenReturn(mockEvent);

    try {
        sqsMsgSender.send(mockChannel);
    }
    catch (EventDeliveryException ede) {
        // Make sure that the original payload is also part of the exception error messsage body
        // to get the failed payloads logged along with errors
        Assert.assertTrue(ede.getMessage().contains(msgBody));
        //rethrow as the test is expecting this exception to be thrown
        throw ede;
    }
}
 
开发者ID:dpandya,项目名称:flume-ng-aws-sqs-sink,代码行数:45,代码来源:BatchSQSMsgSenderTest.java


示例20: main

import com.amazonaws.services.sqs.model.SendMessageBatchRequest; //导入依赖的package包/类
public static void main(String[] args)
{
    final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

    try {
        CreateQueueResult create_result = sqs.createQueue(QUEUE_NAME);
    } catch (AmazonSQSException e) {
        if (!e.getErrorCode().equals("QueueAlreadyExists")) {
            throw e;
        }
    }

    String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();

    SendMessageRequest send_msg_request = new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody("hello world")
            .withDelaySeconds(5);
    sqs.sendMessage(send_msg_request);


    // Send multiple messages to the queue
    SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
            .withQueueUrl(queueUrl)
            .withEntries(
                    new SendMessageBatchRequestEntry(
                            "msg_1", "Hello from message 1"),
                    new SendMessageBatchRequestEntry(
                            "msg_2", "Hello from message 2")
                            .withDelaySeconds(10));
    sqs.sendMessageBatch(send_batch_request);

    // receive messages from the queue
    List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();

    // delete messages from the queue
    for (Message m : messages) {
        sqs.deleteMessage(queueUrl, m.getReceiptHandle());
    }
}
 
开发者ID:awsdocs,项目名称:aws-doc-sdk-examples,代码行数:41,代码来源:SendReceiveMessages.java



注:本文中的com.amazonaws.services.sqs.model.SendMessageBatchRequest类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Selection类代码示例发布时间:2022-05-21
下一篇:
Java Steppable类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap