From dac625179ac95e4953be3cb2ae6e2a8513235b38 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 15 Sep 2017 10:59:57 -0500 Subject: [PATCH] ARTEMIS-1218 implement MQTT link stealing --- .../protocol/mqtt/MQTTConnectionManager.java | 19 +++-- .../protocol/mqtt/MQTTProtocolManager.java | 24 ++++-- .../integration/mqtt/imported/MQTTTest.java | 77 +++++++++---------- 3 files changed, 66 insertions(+), 54 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 79b97a3ff0..7e88028550 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -143,8 +143,12 @@ public class MQTTConnectionManager { if (session.getSessionState() != null) { session.getSessionState().setAttached(false); String clientId = session.getSessionState().getClientId(); - if (clientId != null) { - session.getProtocolManager().getConnectedClients().remove(clientId); + /** + * ensure that the connection for the client ID matches *this* connection otherwise we could remove the + * entry for the client who "stole" this client ID via [MQTT-3.1.4-2] + */ + if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) { + session.getProtocolManager().removeConnectedClient(clientId); } } } @@ -176,12 +180,13 @@ public class MQTTConnectionManager { // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null return null; } - } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) { - // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it. + } else { + MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId, session.getConnection()); - - // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID - return null; + if (connection != null) { + // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client + connection.disconnect(false); + } } return clientId; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 8ee40339f8..c8832bafaa 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -19,7 +19,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -39,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; /** * MQTTProtocolManager @@ -55,7 +55,7 @@ class MQTTProtocolManager extends AbstractProtocolManager outgoingInterceptors = new ArrayList<>(); //TODO Read in a list of existing client IDs from stored Sessions. - private Set connectedClients = new ConcurrentHashSet<>(); + private Map connectedClients = new ConcurrentHashMap<>(); MQTTProtocolManager(ActiveMQServer server, List incomingInterceptors, @@ -178,7 +178,21 @@ class MQTTProtocolManager extends AbstractProtocolManager getConnectedClients() { - return connectedClients; + public boolean isClientConnected(String clientId, MQTTConnection connection) { + return connectedClients.get(clientId).equals(connection); + } + + public void removeConnectedClient(String clientId) { + connectedClients.remove(clientId); + } + + /** + * @param clientId + * @param connection + * @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for + * the {@code clientId} + */ + public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { + return connectedClients.put(clientId, connection); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 9087938dd4..8a1eea2f73 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.activemq.artemis.api.core.RoutingType; @@ -56,7 +57,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.MQTTException; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; @@ -1350,11 +1350,8 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } - @Ignore @Test(timeout = 60 * 1000) - // TODO We currently do not support link stealing. This needs to be enabled for this test to pass. public void testDuplicateClientId() throws Exception { - // test link stealing enabled by default final String clientId = "duplicateClient"; MQTT mqtt = createMQTTConnection(clientId, false); mqtt.setKeepAlive((short) 2); @@ -1384,31 +1381,45 @@ public class MQTTTest extends MQTTTestSupport { connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); connection1.disconnect(); + } - // disable link stealing - stopBroker(); - protocolConfig = "allowLinkStealing=false"; - startBroker(); + @Test(timeout = 60 * 1000) + public void testRepeatedLinkStealing() throws Exception { + final String clientId = "duplicateClient"; + final AtomicReference oldConnection = new AtomicReference<>(); + final String TOPICA = "TopicA"; - mqtt = createMQTTConnection(clientId, false); - mqtt.setKeepAlive((short) 2); - final BlockingConnection connection2 = mqtt.blockingConnection(); - connection2.connect(); - connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + for (int i = 1; i <= 10; ++i) { - mqtt1 = createMQTTConnection(clientId, false); - mqtt1.setKeepAlive((short) 2); - final BlockingConnection connection3 = mqtt1.blockingConnection(); - try { - connection3.connect(); - fail("Duplicate client connected"); - } catch (Exception e) { - // ignore + LOG.info("Creating MQTT Connection {}", i); + + MQTT mqtt = createMQTTConnection(clientId, false); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + + assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return connection.isConnected(); + } + }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200))); + + if (oldConnection.get() != null) { + assertTrue("Old client still connected on attempt: " + i, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return !oldConnection.get().isConnected(); + } + }, TimeUnit.SECONDS.toMillis(3), TimeUnit.MILLISECONDS.toMillis(200))); + } + + oldConnection.set(connection); } - assertTrue("Old client disconnected", connection2.isConnected()); - connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); - connection2.disconnect(); + oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + oldConnection.get().disconnect(); } @Test(timeout = 30 * 10000) @@ -1968,24 +1979,6 @@ public class MQTTTest extends MQTTTestSupport { } - @Test - public void testDuplicateIDReturnsError() throws Exception { - String clientId = "clientId"; - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(clientId); - mqtt.blockingConnection().connect(); - - MQTTException e = null; - try { - MQTT mqtt2 = createMQTTConnection(); - mqtt2.setClientId(clientId); - mqtt2.blockingConnection().connect(); - } catch (MQTTException mqttE) { - e = mqttE; - } - assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED")); - } - @Test public void testDoubleBroker() throws Exception { /*