[AMQ-6859] MQTT - topic name of the message

This commit is contained in:
Dejan Bosanac 2017-11-10 15:15:30 +01:00
parent 96ce14b278
commit a0a23b99cc
2 changed files with 40 additions and 21 deletions

View File

@ -106,7 +106,7 @@ public class MQTTProtocolConverter {
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); private final Map<ActiveMQDestination, String> mqttTopicMap = new LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE);
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
@ -594,11 +594,15 @@ public class MQTTProtocolConverter {
String topicName; String topicName;
synchronized (mqttTopicMap) { synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination()); ActiveMQDestination destination = message.getDestination();
if (destination.isPattern() && message.getOriginalDestination() != null) {
destination = message.getOriginalDestination();
}
topicName = mqttTopicMap.get(destination);
if (topicName == null) { if (topicName == null) {
String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); String amqTopicName = findSubscriptionStrategy().onSend(destination);
topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
mqttTopicMap.put(message.getJMSDestination(), topicName); mqttTopicMap.put(destination, topicName);
} }
} }
result.topicName(new UTF8Buffer(topicName)); result.topicName(new UTF8Buffer(topicName));

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -160,7 +161,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
// One delivery for topic ACCOUNT_PREFIX + "#" // One delivery for topic ACCOUNT_PREFIX + "#"
String result = listener.messageQ.poll(20, TimeUnit.SECONDS); String result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
@ -168,10 +169,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
// One delivery for topic ACCOUNT_PREFIX + "a/1/2" // One delivery for topic ACCOUNT_PREFIX + "a/1/2"
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
// One delivery for topic ACCOUNT_PREFIX + "#" // One delivery for topic ACCOUNT_PREFIX + "#"
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
@ -183,7 +184,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
// One delivery for topic ACCOUNT_PREFIX + "1/2/3" // One delivery for topic ACCOUNT_PREFIX + "1/2/3"
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
@ -208,13 +209,13 @@ public class PahoMQTTTest extends MQTTTestSupport {
String expectedResult = "hello mqtt broker on hash"; String expectedResult = "hello mqtt broker on hash";
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
String result = listener.messageQ.poll(20, TimeUnit.SECONDS); String result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on a different topic"; expectedResult = "hello mqtt broker on a different topic";
client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
@ -229,18 +230,18 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
// One message from topic subscription on ACCOUNT_PREFIX + "#" // One message from topic subscription on ACCOUNT_PREFIX + "#"
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
// One message from topic subscription on ACCOUNT_PREFIX + "1/2/3" // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3"
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on some other topic"; expectedResult = "hello mqtt broker on some other topic";
client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
result = listener.messageQ.poll(20, TimeUnit.SECONDS); result = listener.messageQ.poll(20, TimeUnit.SECONDS).getValue();
assertEquals(expectedResult, result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
@ -252,14 +253,12 @@ public class PahoMQTTTest extends MQTTTestSupport {
expectedResult = "this should not come back..."; expectedResult = "this should not come back...";
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
result = listener.messageQ.poll(3, TimeUnit.SECONDS); assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS));
assertNull(result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back either..."; expectedResult = "this should not come back either...";
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
result = listener.messageQ.poll(3, TimeUnit.SECONDS); assertNull(listener.messageQ.poll(3, TimeUnit.SECONDS));
assertNull(result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
client.disconnect(); client.disconnect();
@ -351,11 +350,11 @@ public class PahoMQTTTest extends MQTTTestSupport {
String message = "Message from client: " + clientId; String message = "Message from client: " + clientId;
client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false); client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false);
String result = client1MqttCallback.messageQ.poll(10, TimeUnit.SECONDS); String result = client1MqttCallback.messageQ.poll(10, TimeUnit.SECONDS).getValue();
assertEquals(message, result); assertEquals(message, result);
assertEquals(1, client1MqttCallback.received.get()); assertEquals(1, client1MqttCallback.received.get());
result = clientAdminMqttCallback.messageQ.poll(10, TimeUnit.SECONDS); result = clientAdminMqttCallback.messageQ.poll(10, TimeUnit.SECONDS).getValue();
assertEquals(message, result); assertEquals(message, result);
assertTrue(client1.isConnected()); assertTrue(client1.isConnected());
@ -384,6 +383,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
clientAdmin.close(); clientAdmin.close();
} }
@Test(timeout = 300000)
public void testActiveMQWildCards1() throws Exception {
final DefaultListener listener = new DefaultListener();
MqttClient client = createClient(false, "receive", listener);
final String ACCOUNT_PREFIX = "test/";
client.subscribe(ACCOUNT_PREFIX+"a/#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "should get this 1";
String topic = ACCOUNT_PREFIX+"a/b/1.2.3*4>";
client.publish(topic, expectedResult.getBytes(), 0, false);
AbstractMap.SimpleEntry<String,String> entry = listener.messageQ.poll(20, TimeUnit.SECONDS);
assertEquals(topic, entry.getKey());
assertEquals(expectedResult, entry.getValue());
assertTrue(client.getPendingDeliveryTokens().length == 0);
}
protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(cleanSession); options.setCleanSession(cleanSession);
@ -427,7 +442,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
static class DefaultListener implements MqttCallback { static class DefaultListener implements MqttCallback {
final AtomicInteger received = new AtomicInteger(); final AtomicInteger received = new AtomicInteger();
final BlockingQueue<String> messageQ = new ArrayBlockingQueue<String>(10); final BlockingQueue<AbstractMap.SimpleEntry<String, String>> messageQ = new ArrayBlockingQueue<AbstractMap.SimpleEntry<String, String>>(10);
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
@ -437,7 +452,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) throws Exception {
LOG.info("Received: {}", message); LOG.info("Received: {}", message);
received.incrementAndGet(); received.incrementAndGet();
messageQ.put(new String(message.getPayload(), StandardCharsets.UTF_8)); messageQ.put(new AbstractMap.SimpleEntry(topic, new String(message.getPayload(), StandardCharsets.UTF_8)));
} }
@Override @Override