ARTEMIS-3871 fix MQTT shared sub q naming semantics
This commit is contained in:
parent
ef67ea0e26
commit
ba2cbddd6b
|
@ -306,7 +306,7 @@ public class MQTTSubscriptionManager {
|
||||||
int slashIndex = topic.indexOf(SLASH) + 1;
|
int slashIndex = topic.indexOf(SLASH) + 1;
|
||||||
String sharedSubscriptionName = topic.substring(slashIndex, topic.indexOf(SLASH, slashIndex));
|
String sharedSubscriptionName = topic.substring(slashIndex, topic.indexOf(SLASH, slashIndex));
|
||||||
String parsedTopicName = topic.substring(topic.indexOf(SLASH, slashIndex) + 1);
|
String parsedTopicName = topic.substring(topic.indexOf(SLASH, slashIndex) + 1);
|
||||||
return new SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName);
|
return new SimpleString(sharedSubscriptionName).concat(".").concat(parsedTopicName);
|
||||||
} else {
|
} else {
|
||||||
return new SimpleString(session.getState().getClientId()).concat(".").concat(topic);
|
return new SimpleString(session.getState().getClientId()).concat(".").concat(topic);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,9 @@ Highlights:
|
||||||
subscription queues has changed. Previously the subscription queue was named according to the subscription name
|
subscription queues has changed. Previously the subscription queue was named according to the subscription name
|
||||||
provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name to be used across multiple subscriptions
|
provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name to be used across multiple subscriptions
|
||||||
whereas queues in the broker must be named uniquely. Now the subscription queue will be named according to the
|
whereas queues in the broker must be named uniquely. Now the subscription queue will be named according to the
|
||||||
subscription name, client ID, and topic name so that all subscription queue names will be unique. Before upgrading
|
subscription name and topic name so that all subscription queue names will be unique. Before upgrading please ensure
|
||||||
please ensure all MQTT shared subscriptions are empty. When the subscribers reconnect they will get a new
|
all MQTT shared subscriptions are empty. When the subscribers reconnect they will get a new subscription queue. If
|
||||||
subscription queue. If they are not empty you can move the messages to the new subscription queue administratively.
|
they are not empty you can move the messages to the new subscription queue administratively.
|
||||||
|
|
||||||
## 2.27.1
|
## 2.27.1
|
||||||
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
|
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
|
||||||
|
|
|
@ -73,8 +73,9 @@ public class SubscriptionTests extends MQTT5TestSupport {
|
||||||
});
|
});
|
||||||
consumer1.subscribe(SHARED_SUB, 0);
|
consumer1.subscribe(SHARED_SUB, 0);
|
||||||
|
|
||||||
assertNotNull(server.locateQueue(SUB_NAME));
|
Queue sharedSubQueue = server.locateQueue(SUB_NAME.concat(".").concat(TOPIC));
|
||||||
assertEquals(TOPIC, server.locateQueue(SUB_NAME).getAddress().toString());
|
assertNotNull(sharedSubQueue);
|
||||||
|
assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
|
||||||
|
|
||||||
MqttClient consumer2 = createPahoClient("consumer2");
|
MqttClient consumer2 = createPahoClient("consumer2");
|
||||||
consumer2.connect();
|
consumer2.connect();
|
||||||
|
@ -90,7 +91,7 @@ public class SubscriptionTests extends MQTT5TestSupport {
|
||||||
});
|
});
|
||||||
consumer2.subscribe(SHARED_SUB, 1);
|
consumer2.subscribe(SHARED_SUB, 1);
|
||||||
|
|
||||||
assertEquals(2, server.locateQueue(SUB_NAME).getConsumerCount());
|
assertEquals(2, sharedSubQueue.getConsumerCount());
|
||||||
|
|
||||||
MqttClient producer = createPahoClient("producer");
|
MqttClient producer = createPahoClient("producer");
|
||||||
producer.connect();
|
producer.connect();
|
||||||
|
@ -127,7 +128,7 @@ public class SubscriptionTests extends MQTT5TestSupport {
|
||||||
consumer1.setCallback(new LatchedMqttCallback(ackLatch));
|
consumer1.setCallback(new LatchedMqttCallback(ackLatch));
|
||||||
consumer1.subscribe(SHARED_SUB, 1);
|
consumer1.subscribe(SHARED_SUB, 1);
|
||||||
|
|
||||||
Queue sharedSubQueue = server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC));
|
Queue sharedSubQueue = server.locateQueue(SUB_NAME.concat(".").concat(TOPIC));
|
||||||
assertNotNull(sharedSubQueue);
|
assertNotNull(sharedSubQueue);
|
||||||
assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
|
assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
|
||||||
assertEquals(1, sharedSubQueue.getConsumerCount());
|
assertEquals(1, sharedSubQueue.getConsumerCount());
|
||||||
|
|
Loading…
Reference in New Issue