This closes #913
This commit is contained in:
commit
47f46501e3
|
@ -17,17 +17,21 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
import org.apache.activemq.artemis.api.core.FilterConstants;
|
import org.apache.activemq.artemis.api.core.FilterConstants;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
|
||||||
|
@ -89,20 +93,65 @@ public class MQTTSubscriptionManager {
|
||||||
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
|
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
|
||||||
*/
|
*/
|
||||||
private Queue createQueueForSubscription(String address, int qos) throws Exception {
|
private Queue createQueueForSubscription(String address, int qos) throws Exception {
|
||||||
|
// Check to see if a subscription queue already exists.
|
||||||
SimpleString queue = getQueueNameForTopic(address);
|
SimpleString queue = getQueueNameForTopic(address);
|
||||||
|
|
||||||
Queue q = session.getServer().locateQueue(queue);
|
Queue q = session.getServer().locateQueue(queue);
|
||||||
|
|
||||||
|
// The queue does not exist so we need to create it.
|
||||||
if (q == null) {
|
if (q == null) {
|
||||||
q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
|
SimpleString sAddress = SimpleString.toSimpleString(address);
|
||||||
} else {
|
|
||||||
if (q.isDeleteOnNoConsumers()) {
|
// Check we can auto create queues.
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
|
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress);
|
||||||
|
if (!bindingQueryResult.isAutoCreateQueues()) {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that the address exists, if not we try to auto create it.
|
||||||
|
AddressInfo addressInfo = session.getServerSession().getAddress(sAddress);
|
||||||
|
if (addressInfo == null) {
|
||||||
|
if (!bindingQueryResult.isAutoCreateAddresses()) {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
|
||||||
|
}
|
||||||
|
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false);
|
||||||
|
}
|
||||||
|
return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
|
||||||
}
|
}
|
||||||
return q;
|
return q;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception {
|
||||||
|
|
||||||
|
if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
|
||||||
|
return session.getServerSession().createQueue(addressInfo.getName(), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
|
||||||
|
if (!bindingQueryResult.getQueueNames().isEmpty()) {
|
||||||
|
SimpleString name = null;
|
||||||
|
for (SimpleString qName : bindingQueryResult.getQueueNames()) {
|
||||||
|
if (name == null) {
|
||||||
|
name = qName;
|
||||||
|
} else if (qName.equals(addressInfo.getName())) {
|
||||||
|
name = qName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return session.getServer().locateQueue(name);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
return session.getServerSession().createQueue(addressInfo.getName(), addressInfo.getName(), RoutingType.ANYCAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
|
||||||
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
|
return session.getServer().locateQueue(addressInfo.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<RoutingType> routingTypeSet = new HashSet();
|
||||||
|
routingTypeSet.add(RoutingType.MULTICAST);
|
||||||
|
routingTypeSet.add(RoutingType.ANYCAST);
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(addressInfo.getRoutingType(), addressInfo.getName().toString(), routingTypeSet);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new consumer for the queue associated with a subscription
|
* Creates a new consumer for the queue associated with a subscription
|
||||||
*/
|
*/
|
||||||
|
@ -122,10 +171,6 @@ public class MQTTSubscriptionManager {
|
||||||
String topic = subscription.topicName();
|
String topic = subscription.topicName();
|
||||||
|
|
||||||
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
|
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
|
||||||
AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
|
|
||||||
if (addressInfo != null && !addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
|
|
||||||
throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), RoutingType.MULTICAST, addressInfo.getRoutingTypes());
|
|
||||||
}
|
|
||||||
|
|
||||||
session.getSessionState().addSubscription(subscription);
|
session.getSessionState().addSubscription(subscription);
|
||||||
|
|
||||||
|
|
|
@ -1510,7 +1510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo getAddress(SimpleString address) {
|
public AddressInfo getAddress(SimpleString address) {
|
||||||
return server.getPostOffice().getAddressInfo(address);
|
return server.getPostOffice().getAddressInfo(removePrefix(address));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,9 +27,11 @@ import java.lang.reflect.Field;
|
||||||
import java.net.ProtocolException;
|
import java.net.ProtocolException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -38,7 +40,6 @@ import java.util.regex.Pattern;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
|
||||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
||||||
|
@ -1642,50 +1643,107 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception {
|
public void testAnycastPrefixWorksWithMQTT() throws Exception {
|
||||||
Exception peerDisconnectedException = null;
|
String clientId = "testMqtt";
|
||||||
try {
|
|
||||||
String clientId = "test.mqtt";
|
|
||||||
SimpleString coreAddress = new SimpleString("foo.bar");
|
|
||||||
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
|
||||||
|
|
||||||
AddressInfo addressInfo = new AddressInfo(coreAddress);
|
String anycastAddress = "anycast:foo/bar";
|
||||||
addressInfo.addRoutingType(RoutingType.ANYCAST);
|
String sendAddress = "foo/bar";
|
||||||
getServer().createOrUpdateAddressInfo(addressInfo);
|
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
|
||||||
|
|
||||||
MQTT mqtt = createMQTTConnection();
|
MQTT mqtt = createMQTTConnection();
|
||||||
mqtt.setClientId(clientId);
|
mqtt.setClientId(clientId);
|
||||||
mqtt.setKeepAlive((short) 2);
|
BlockingConnection connection1 = mqtt.blockingConnection();
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
connection1.connect();
|
||||||
connection.connect();
|
connection1.subscribe(mqttSubscription);
|
||||||
connection.subscribe(mqttSubscription);
|
|
||||||
} catch (EOFException e) {
|
MQTT mqtt2 = createMQTTConnection();
|
||||||
peerDisconnectedException = e;
|
mqtt2.setClientId(clientId + "2");
|
||||||
}
|
BlockingConnection connection2 = mqtt2.blockingConnection();
|
||||||
assertNotNull(peerDisconnectedException);
|
connection2.connect();
|
||||||
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
|
connection2.subscribe(mqttSubscription);
|
||||||
|
|
||||||
|
String message1 = "TestMessage1";
|
||||||
|
String message2 = "TestMessage2";
|
||||||
|
|
||||||
|
connection1.publish(sendAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
connection2.publish(sendAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
|
||||||
|
assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception {
|
public void testAnycastAddressWorksWithMQTT() throws Exception {
|
||||||
Exception peerDisconnectedException = null;
|
String anycastAddress = "foo/bar";
|
||||||
try {
|
|
||||||
String clientId = "testMqtt";
|
|
||||||
SimpleString coreAddress = new SimpleString("foo.bar");
|
|
||||||
getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
|
|
||||||
|
|
||||||
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
|
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST));
|
||||||
|
String clientId = "testMqtt";
|
||||||
|
|
||||||
|
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
|
||||||
|
|
||||||
MQTT mqtt = createMQTTConnection();
|
MQTT mqtt = createMQTTConnection();
|
||||||
mqtt.setClientId(clientId);
|
mqtt.setClientId(clientId);
|
||||||
mqtt.setKeepAlive((short) 2);
|
BlockingConnection connection1 = mqtt.blockingConnection();
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
connection1.connect();
|
||||||
connection.connect();
|
connection1.subscribe(mqttSubscription);
|
||||||
connection.subscribe(mqttSubscription);
|
|
||||||
} catch (EOFException e) {
|
MQTT mqtt2 = createMQTTConnection();
|
||||||
peerDisconnectedException = e;
|
mqtt2.setClientId(clientId + "2");
|
||||||
|
BlockingConnection connection2 = mqtt2.blockingConnection();
|
||||||
|
connection2.connect();
|
||||||
|
connection2.subscribe(mqttSubscription);
|
||||||
|
|
||||||
|
String message1 = "TestMessage1";
|
||||||
|
String message2 = "TestMessage2";
|
||||||
|
|
||||||
|
connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
|
||||||
|
assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
}
|
}
|
||||||
assertNotNull(peerDisconnectedException);
|
|
||||||
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testAmbiguousRoutingWithMQTT() throws Exception {
|
||||||
|
String anycastAddress = "foo/bar";
|
||||||
|
|
||||||
|
Set<RoutingType> routingTypeSet = new HashSet<>();
|
||||||
|
routingTypeSet.add(RoutingType.ANYCAST);
|
||||||
|
routingTypeSet.add(RoutingType.MULTICAST);
|
||||||
|
|
||||||
|
getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet));
|
||||||
|
String clientId = "testMqtt";
|
||||||
|
|
||||||
|
Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
|
||||||
|
|
||||||
|
MQTT mqtt = createMQTTConnection();
|
||||||
|
mqtt.setClientId(clientId);
|
||||||
|
BlockingConnection connection1 = mqtt.blockingConnection();
|
||||||
|
connection1.connect();
|
||||||
|
connection1.subscribe(mqttSubscription);
|
||||||
|
|
||||||
|
MQTT mqtt2 = createMQTTConnection();
|
||||||
|
mqtt2.setClientId(clientId + "2");
|
||||||
|
BlockingConnection connection2 = mqtt2.blockingConnection();
|
||||||
|
connection2.connect();
|
||||||
|
connection2.subscribe(mqttSubscription);
|
||||||
|
|
||||||
|
String message1 = "TestMessage1";
|
||||||
|
String message2 = "TestMessage2";
|
||||||
|
|
||||||
|
connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
|
||||||
|
assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,8 +167,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.PORT_PROP_NAME, "" + port);
|
params.put(TransportConstants.PORT_PROP_NAME, "" + port);
|
||||||
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
|
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
|
||||||
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
|
|
||||||
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
|
server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:");
|
||||||
|
|
||||||
LOG.info("Added connector {} to broker", getProtocolScheme());
|
LOG.info("Added connector {} to broker", getProtocolScheme());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue