在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:nats-io/nats.java开源软件地址:https://github.com/nats-io/nats.java开源编程语言:Java 100.0%开源软件介绍:NATS - Java ClientA Java client for the NATS messaging system. A Note on VersionsThis is version 2.x of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies. The API is simple to use and highly performant. Version 2+ uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2 to allow clients to limit there use of this new API. With the addition of drain() we updated to 2.1, NKey support moved us to 2.2. The NATS server renamed itself from gnatsd to nats-server around 2.4.4. This and other files try to use the new names, but some underlying code may change over several versions. If you are building yourself, please keep an eye out for issues and report them. Version 2.5.0 adds some back pressure to publish calls to alleviate issues when there is a slow network. This may alter performance characteristics of publishing apps, although the total performance is equivalent. Previous versions are still available in the repo. Versions 2.11.6 and server versionsVersion 2.11.6 is the last java-nats version which is supported to work with server v2.3.4 and earlier.
It will not be officially supported to work with servers after v2.3.4, but should be fine if you don't use
the queue behavior advertised in example code If you want to take advantage of the fixes and features provided in the server after v2.3.4, you must upgrade to the release version 2.12.0 or later. SSL/TLS PerformanceAfter recent tests we realized that TLS performance is lower than we would like. After researching the problem and possible solutions we came to a few conclusions:
To use conscrypt or wildfly, you will need to add the appropriate jars to your class path and create an SSL context manually. This context can be passed to the Options used when creating a connection. The NATSAutoBench example provides a conscrypt flag which can be used to try out the library, manually including the jar is required. OCSP StaplingOur server now supports OCSP stapling. To enable Java to automatically check the stapling when making TLS connections, you must set system properties. This can be done from your command line or from your Java code:
For more information, see the Oracle Java documentation page on Client-Driven OCSP and OCSP Stapling Also, there is a detailed OCSP Example that shows how to create SSL contexts enabling OCSP stapling. UTF-8 SubjectsThe client protocol spec doesn't explicitly state the encoding on subjects. Some clients use ASCII and some use UTF-8 which matches ASCII for a-Z and 0-9. Until 2.1.2 the 2.0+ version of the Java client used ASCII for performance reasons. As of 2.1.2 you can choose to support UTF-8 subjects via the Options. Keep in mind that there is a small performance penalty for UTF-8 encoding and decoding in benchmarks, but depending on your application this cost may be negligible. Also, keep in mind that not all clients support UTF-8 and test accordingly. NKey-based Challenge Response AuthenticationThe NATS server is adding support for a challenge response authentication scheme based on NKeys. Version 2.2.0 of the Java client supports this scheme via an AuthHandler interface. Version 2.3.0 replaced several NKey methods that used strings with methods using char[] to improve security. InstallationThe java-nats client is provided in a single jar file, with a single external dependency for the encryption in NKey support. See Building From Source for details on building the library. Downloading the JarYou can download the latest jar at https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4.jar. The examples are available at https://search.maven.org/remotecontent?filepath=io/nats/jnats/2.15.4/jnats-2.15.4-examples.jar. To use NKeys, you will need the ed25519 library, which can be downloaded at https://repo1.maven.org/maven2/net/i2p/crypto/eddsa/0.3.0/eddsa-0.3.0.jar. Using GradleThe NATS client is available in the Maven central repository, and can be imported as a standard dependency in your dependencies {
implementation 'io.nats:jnats:2.15.4'
} If you need the latest and greatest before Maven central updates, you can use: repositories {
jcenter()
maven {
url "https://oss.sonatype.org/content/repositories/releases"
}
} If you need a snapshot version, you must add the url for the snapshots and change your dependency. repositories {
...
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}
dependencies {
implementation 'io.nats:jnats:2.15.4-SNAPSHOT'
} Using MavenThe NATS client is available on the Maven central repository, and can be imported as a normal dependency in your pom.xml file: <dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.15.4</version>
</dependency> If you need the absolute latest, before it propagates to maven central, you can use the repository: <repositories>
<repository>
<id>sonatype releases</id>
<url>https://oss.sonatype.org/content/repositories/releases</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories> If you need a snapshot version, you must enable snapshots and change your dependency. <repositories>
<repository>
<id>sonatype snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.15.4-SNAPSHOT</version>
</dependency> If you are using the 1.x version of java-nats and don't want to upgrade to 2.0.0 please use ranges in your POM file, java-nats-streaming 1.x is using [1.1, 1.9.9) for this. Basic UsageSending and receiving with NATS is as simple as connecting to the nats-server and publishing or subscribing for messages. A number of examples are provided in this repo as described in the Examples Readme. ConnectingThere are four different ways to connect using the Java library:
PublishingOnce connected, publishing is accomplished via one of three methods:
All of these methods, as well as the incoming message code use byte arrays for maximum flexibility. Applications can send JSON, Strings, YAML, Protocol Buffers, or any other format through NATS to applications written in a wide range of languages. ReplyTo When Making A RequestThe Message object allows you to set a replyTo, but in requests, the replyTo is reserved for internal use as the address for the server to respond to the client with the consumer's reply. Listening for Incoming MessagesThe Java NATS library provides two mechanisms to listen for messages, three if you include the request/reply discussed above.
JetStreamPublishing and subscribing to JetStream enabled servers is straightforward. A JetStream enabled application will connect to a server, establish a JetStream context, and then publish or subscribe. This can be mixed and matched with standard NATS subject, and JetStream subscribers, depending on configuration, receive messages from both streams and directly from other NATS producers. The JetStream ContextAfter establishing a connection as described above, create a JetStream Context. JetStream js = nc.JetStream(); You can pass options to configure the JetStream client, although the defaults should
suffice for most users. See the There is no limit to the number of contexts used, although normally one would only require a single context. Contexts may be prefixed to be used in conjunction with NATS authorization. PublishingTo publish messages, use the Synchronous: // create a typical NATS message
Message msg = NatsMessage.builder()
.subject("foo")
.data("hello", StandardCharsets.UTF_8)
.build();
PublishAck pa = js.publish(msg); See If there is a problem an exception will be thrown, and the message may not have been persisted. Otherwise, the stream name and sequence number is returned in the publish acknowledgement. There are a variety of publish options that can be set when publishing. When duplicate checking has been enabled on the stream, a message ID should be set. One set of options are expectations. You can set a publish expectation such as a particular stream name, previous message ID, or previous sequence number. These are hints to the server that it should reject messages where these are not met, primarily for enforcing your ordering or ensuring messages are not stored on the wrong stream. The PublishOptions are immutable, but the builder an be re-used for expectations by clearing the expected. For example: PublishOptions.Builder pubOptsBuilder = PublishOptions.builder()
.expectedStream("TEST")
.messageId("mid1");
PublishAck pa = js.publish("foo", null, pubOptsBuilder.build());
pubOptsBuilder.clearExpected()
.setExpectedLastMsgId("mid1")
.setExpectedLastSequence(1)
.messageId("mid2");
pa = js.publish("foo", null, pubOptsBuilder.build()); See Asynchronous: List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
for (int x = 1; x < roundCount; x++) {
// create a typical NATS message
Message msg = NatsMessage.builder()
.subject("foo")
.data("hello", StandardCharsets.UTF_8)
.build();
// Publish a message
futures.add(js.publishAsync(msg));
}
for (CompletableFuture<PublishAck> future : futures) {
... process the futures
} See the ReplyTo When PublishingThe Message object allows you to set a replyTo, but in publish requests, the replyTo is reserved for internal use as the address for the server to respond to the client with the PublishAck. SubscribingThere are two methods of subscribing, Push and Pull with each variety having its own set of options and abilities. Push SubscribingPush subscriptions can be synchronous or asynchronous. The server pushes messages to the client. Asynchronous: Dispatcher disp = ...;
MessageHandler handler = (msg) -> {
// Process the message.
// Ack the message depending on the ack model
};
PushSubscribeOptions so = PushSubscribeOptions.builder()
.durable("optional-durable-name")
.build();
boolean autoAck = ...
js.subscribe("my-subject", disp, handler, autoAck); See the Synchronous: See PushSubscribeOptions so = PushSubscribeOptions.builder()
.durable("optional-durable-name")
.build();
// Subscribe synchronously, then just wait for messages.
JetStreamSubscription sub = js.subscribe("subject", so);
nc.flush(Duration.ofSeconds(5));
Message msg = sub.nextMessage(Duration.ofSeconds(1)); Pull SubscribingPull subscriptions are always synchronous. The server organizes messages into a batch which it sends when requested. PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
.durable("durable-name-is-required")
.build();
JetStreamSubscription sub = js.subscribe("subject", pullOptions); Fetch: List<Message> message = sub.fetch(100, Duration.ofSeconds(1));
for (Message m : messages) {
// process message
m.ack();
} The fetch pull is a macro pull that uses advanced pulls under the covers to return a list of messages. The list may be empty or contain at most the batch size. All status messages are handled for you. The client can provide a timeout to wait for the first message in a batch. The fetch call returns when the batch is ready. The timeout may be exceeded if the server sent messages very near the end of the timeout period. See Iterate: Iterator<Message> iter = sub.iterate(100, Duration.ofSeconds(1));
while (iter.hasNext()) {
Message m = iter.next();
// process message
m.ack();
} The iterate pull is a macro pull that uses advanced pulls under the covers to return an iterator. The iterator may have no messages up to at most the batch size. All status messages are handled for you. The client can provide a timeout to wait for the first message in a batch. The iterate call returns the iterator immediately, but under the covers it will wait for the first message based on the timeout. The timeout may be exceeded if the server sent messages very near the end of the timeout period. See Batch Size: sub.pull(100);
...
Message m = sub.nextMessage(Duration.ofSeconds(1)); An advanced version of pull specifies a batch size. When asked, the server will send whatever messages it has up to the batch size. If it has no messages it will wait until it has some to send. The client may time out before that time. If there are less than the batch size available, you can ask for more later. Once the entire batch size has been filled, you must make another pull request. See No Wait and Batch Size: sub.pullNoWait(100);
...
Message m = sub.nextMessage(Duration.ofSeconds(1)); An advanced version of pull also specifies a batch size. When asked, the server will send whatever messages it has up to the batch size, but will never wait for the batch to fill and the client will return immediately. If there are less than the batch size available, you will get what is available and a 404 status message indicating the server did not have enough messages. You must make a pull request every time. This is an advanced api See the Expires In and Batch Size: sub.pullExpiresIn(100, Duration.ofSeconds(3));
...
Message m = sub.nextMessage(Duration.ofSeconds(4)); Another advanced version of pull specifies a maximum time to wait for the batch to fill. The server returns messages when either the batch is filled or the time expires. It's important to set your client's timeout to be longer than the time you've asked the server to expire in. You must make a pull request every time. In subsequent pulls, you will receive multiple 408 status messages, one for each message the previous batch was short. You can just ignore these. This is an advanced api See Ordered Push Subscription OptionYou can now set a Push Subscription option called "Ordered". When you set this flag, library will take over creation of the consumer and create a subscription that guarantees the order of messages. This consumer will use flow control with a default heartbeat of 5 seconds. Messages will not require acks as the Ack Policy will be set to No Ack. When creating the subscription, there are some restrictions for the consumer configuration settings.
You can however set the deliver policy which will be used to start the subscription. |
请发表评论