BAEL-1545 - Nats Tutorial (#3904)

This commit is contained in:
Eric Goebelbecker 2018-03-29 01:06:24 -04:00 committed by maibin
parent 5b00f0e0e3
commit 48b8d2c8a5
3 changed files with 273 additions and 0 deletions

View File

@ -114,6 +114,13 @@
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>${jnats.version}</version>
</dependency>
<dependency>
<groupId>rome</groupId>
<artifactId>rome</artifactId>
@ -797,6 +804,8 @@
<commons.dbutils.version>1.6</commons.dbutils.version>
<h2.version>1.4.196</h2.version>
<jetty.version>9.4.8.v20171121</jetty.version>
<jnats.version>1.0</jnats.version>
<httpclient.version>4.5.3</httpclient.version>
<commons.io.version>2.5</commons.io.version>
<flink.version>1.2.0</flink.version>

View File

@ -0,0 +1,150 @@
package com.baeldung.jnats;
import io.nats.client.*;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NatsClient {
private String serverURI;
private Connection natsConnection;
private Map<String, Subscription> subscriptions = new HashMap<>();
private final static Logger log = LoggerFactory.getLogger(NatsClient.class);
public NatsClient() {
this.serverURI = "jnats://localhost:4222";
natsConnection = initConnection(serverURI);
}
public NatsClient(String serverURI) {
if ((serverURI != null) && (!serverURI.isEmpty())) {
this.serverURI = serverURI;
} else {
this.serverURI = "jnats://localhost:4222";
}
natsConnection = initConnection(serverURI);
}
public void closeConnection() {
// Close connection
natsConnection.close();
}
private Connection initConnection(String uri) {
try {
Options options = new Options.Builder()
.errorCb(new ExceptionHandler() {
@Override
public void onException(NATSException ex) {
log.error("Connection Exception: ", ex);
}
})
.disconnectedCb(new DisconnectedCallback() {
@Override
public void onDisconnect(ConnectionEvent event) {
log.error("Channel disconnected: {}", event.getConnection());
}
})
.reconnectedCb(new ReconnectedCallback() {
@Override
public void onReconnect(ConnectionEvent event) {
log.error("Reconnected to server: {}", event.getConnection());
}
})
.build();
return Nats.connect(uri, options);
} catch (IOException ioe) {
log.error("Error connecting to NATs! ", ioe);
return null;
}
}
public void publishMessage(String topic, String replyTo, String message) {
try {
// Simple Publisher
natsConnection.publish(topic, replyTo, message.getBytes());
} catch (IOException ioe) {
log.error("Error publishing message: {} to {} ", message, topic, ioe);
}
}
public void subscribeAsync(String topic) {
// Simple Async Subscriber
AsyncSubscription subscription = natsConnection.subscribe(topic, new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
if (subscription == null) {
log.error("Error subscribing to {}", topic);
} else {
subscriptions.put(topic, subscription);
}
}
public SyncSubscription subscribeSync(String topic) {
// Simple Sync Subscriber
return natsConnection.subscribe(topic);
}
public void unsubscribe(String topic) {
try {
Subscription subscription = subscriptions.get(topic);
if (subscription != null) {
subscription.unsubscribe();
} else {
log.error("{} not found. Unable to unsubscribe.", topic);
}
} catch (IOException ioe) {
log.error("Error unsubscribing from {} ", topic, ioe);
}
}
public Message makeRequest(String topic, String request) {
try {
return natsConnection.request(topic, request.getBytes(), 100);
} catch (IOException | InterruptedException ioe) {
log.error("Error making request {} to {} ", topic, request, ioe);
return null;
}
}
public void installReply(String topic, String reply) {
natsConnection.subscribe(topic, message -> {
try {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
});
}
public SyncSubscription joinQueueGroup(String topic, String queue) {
return natsConnection.subscribe(topic, queue);
}
}

View File

@ -0,0 +1,114 @@
package com.baeldung.jnats;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class NatsClientLiveTest {
@Test
public void givenMessageExchange_MessagesReceived() throws Exception {
NatsClient client = connectClient();
SyncSubscription fooSubscription = client.subscribeSync("foo.bar");
SyncSubscription barSubscription = client.subscribeSync("bar.foo");
client.publishMessage("foo.bar", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
client.publishMessage(message.getReplyTo(), message.getSubject(), "hello back");
message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));
}
private NatsClient connectClient() {
return new NatsClient();
}
@Test
public void whenWildCardSubscription_andMatchTopic_MessageReceived() throws Exception {
NatsClient client = connectClient();
SyncSubscription fooSubscription = client.subscribeSync("foo.*");
client.publishMessage("foo.bar", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
}
@Test
public void whenWildCardSubscription_andNotMatchTopic_NoMessageReceived() throws Exception {
NatsClient client = connectClient();
SyncSubscription fooSubscription = client.subscribeSync("foo.*");
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);
SyncSubscription barSubscription = client.subscribeSync("foo.>");
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
}
@Test
public void givenRequest_ReplyReceived() {
NatsClient client = connectClient();
client.installReply("salary.requests", "denied!");
Message reply = client.makeRequest("salary.requests", "I need a raise.");
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));
}
@Test
public void givenQueueMessage_OnlyOneReceived() throws Exception {
NatsClient client = connectClient();
SyncSubscription queue1 = client.joinQueueGroup("foo.bar.requests", "queue1");
SyncSubscription queue2 = client.joinQueueGroup("foo.bar.requests", "queue1");
client.publishMessage("foo.bar.requests", "queuerequestor", "foobar");
List<Message> messages = new ArrayList<>();
Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);
message = queue2.nextMessage(200);
if (message != null) messages.add(message);
assertEquals(1, messages.size());
}
}