ARTEMIS-788 Update MQTT Protocol

This commit is contained in:
Martyn Taylor 2016-12-09 17:47:32 +00:00
parent 89e6ec36bb
commit 887b8c8532
3 changed files with 100 additions and 5 deletions
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported

View File

@ -25,8 +25,10 @@ import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class MQTTSubscriptionManager {
@ -61,7 +63,8 @@ public class MQTTSubscriptionManager {
synchronized void start() throws Exception {
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName());
Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
}
}
@ -84,13 +87,17 @@ public class MQTTSubscriptionManager {
/**
* Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name.
*/
private Queue createQueueForSubscription(String topic, int qos) throws Exception {
String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
private Queue createQueueForSubscription(String address, int qos) throws Exception {
SimpleString queue = getQueueNameForTopic(address);
Queue q = session.getServer().locateQueue(queue);
if (q == null) {
q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false);
} else {
if (q.isDeleteOnNoConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
}
}
return q;
}
@ -113,9 +120,15 @@ public class MQTTSubscriptionManager {
int qos = subscription.qualityOfService().value();
String topic = subscription.topicName();
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
if (addressInfo != null && addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) {
throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
}
session.getSessionState().addSubscription(subscription);
Queue q = createQueueForSubscription(topic, qos);
Queue q = createQueueForSubscription(coreAddress, qos);
if (s == null) {
createConsumerForSubscriptionQueue(q, topic, qos);

View File

@ -22,6 +22,7 @@ import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.EOFException;
import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.util.ArrayList;
@ -34,8 +35,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
@ -1612,4 +1615,79 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception {
Exception peerDisconnectedException = null;
try {
String clientId = "test.client";
SimpleString coreAddress = new SimpleString("foo.bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
AddressInfo addressInfo = new AddressInfo(coreAddress);
addressInfo.setDefaultMaxConsumers(0);
getServer().createOrUpdateAddressInfo(addressInfo);
getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(mqttSubscription);
} catch (EOFException e) {
peerDisconnectedException = e;
}
assertNotNull(peerDisconnectedException);
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
}
@Test(timeout = 60 * 1000)
public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception {
Exception peerDisconnectedException = null;
try {
String clientId = "test.mqtt";
SimpleString coreAddress = new SimpleString("foo.bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
AddressInfo addressInfo = new AddressInfo(coreAddress);
addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
getServer().createOrUpdateAddressInfo(addressInfo);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(mqttSubscription);
} catch (EOFException e) {
peerDisconnectedException = e;
}
assertNotNull(peerDisconnectedException);
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
}
@Test(timeout = 60 * 1000)
public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception {
Exception peerDisconnectedException = null;
try {
String clientId = "testMqtt";
SimpleString coreAddress = new SimpleString("foo.bar");
getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true);
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(mqttSubscription);
} catch (EOFException e) {
peerDisconnectedException = e;
}
assertNotNull(peerDisconnectedException);
assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
}
}

View File

@ -92,6 +92,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return name.getMethodName();
}
public ActiveMQServer getServer() {
return server;
}
@Override
@Before
public void setUp() throws Exception {