A low level Pub/Sub client and a concurrent per-topic batching Publisher.
The client uses async-http-client with the Netty provider for making efficient and async HTTP requests to the Google Cloud Pub/Sub api.
The publisher is implemented on top of the async Pub/Sub client and concurrently gathers individual messages into per-topic batches which are then pushed to Google Cloud Pub/Sub at a specified desired request concurrency level in order to achieve both low-latency and high throughput.
Why
The official Google Cloud Pub/Sub client library was not performant enough for our purposes due to blocking I/O etc.
Usage
Pubsub Client
// Create a topicpubsub.createTopic("my-google-cloud-project", "the-topic").get();
// Create a subscriptionpubsub.createSubscription("my-google-cloud-project", "the-subscription-name", "the-topic").get();
// Create a batch of messagesfinalList<Message> messages = asList(
Message.builder()
.attributes("type", "foo")
.data(encode("hello foo"))
.build(),
Message.builder()
.attributes("type", "bar")
.data(encode("hello foo"))
.build());
// Publish the messagesfinalList<String> messageIds = pubsub.publish("my-google-cloud-project", "the-topic", messages).get();
System.out.println("Message IDs: " + messageIds);
// Pull the messagefinalList<ReceivedMessage> received = pubsub.pull("my-google-cloud-project", "the-subscription").get();
System.out.println("Received Messages: " + received);
// Ack the received messagesfinalList<String> ackIds = received.stream().map(ReceivedMessage::ackId).collect(Collectors.toList());
pubsub.acknowledge("my-google-cloud-project", "the-subscription", ackIds).get();
Publisher
finalPubsubpubsub = Pubsub.builder()
.build();
finalPublisherpublisher = Publisher.builder()
.pubsub(pubsub)
.project("my-google-cloud-project")
.concurrency(128)
.build();
// A never ending stream of messages...finalIterable<MessageAndTopic> messageStream = incomingMessages();
// Publish incoming messagesmessageStream.forEach(m -> publisher.publish(m.topic, m.message));
Have your GnuPG password ready. Both prepare and perform steps will ask you for it.
Note: The current tests run during both prepare and perform include
integration tests against the real Google Pub/Sub API. Verify
that you have a suitable default project and credentials
configured with the gcloud cli.
Tag and push a new release to github:
mvn release:prepare
Publish the signed jar to maven central:
mvn release:perform
Todo
Implement a high level consumer (raw pull/ack support is there)
请发表评论