ARTEMIS-1391 embedding 2 MQTT brokers is broken

This commit is contained in:
Jens Reimann 2017-09-07 17:23:09 +02:00 committed by Clebert Suconic
parent 53c8ee007b
commit 144dbadcb5
6 changed files with 71 additions and 59 deletions

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.Set;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
@ -39,9 +37,6 @@ public class MQTTConnectionManager {
private MQTTSession session;
//TODO Read in a list of existing client IDs from stored Sessions.
public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<>();
private MQTTLogger log = MQTTLogger.LOGGER;
private boolean isWill = false;
@ -149,7 +144,7 @@ public class MQTTConnectionManager {
session.getSessionState().setAttached(false);
String clientId = session.getSessionState().getClientId();
if (clientId != null) {
CONNECTED_CLIENTS.remove(clientId);
session.getProtocolManager().getConnectedClients().remove(clientId);
}
}
}
@ -181,7 +176,7 @@ public class MQTTConnectionManager {
// [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
return null;
}
} else if (!CONNECTED_CLIENTS.add(clientId)) {
} else if (!session.getProtocolManager().getConnectedClients().add(clientId)) {
// ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it.

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
/**
* MQTTProtocolManager
@ -52,6 +54,9 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
//TODO Read in a list of existing client IDs from stored Sessions.
private Set<String> connectedClients = new ConcurrentHashSet<>();
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
@ -172,4 +177,8 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
public Set<String> getConnectedClients() {
return connectedClients;
}
}

View File

@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -44,10 +42,6 @@ public class MQTTFQQNTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}

View File

@ -16,27 +16,25 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.felix.resolver.util.ArrayMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override
@ -45,10 +43,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}

View File

@ -40,15 +40,15 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -85,12 +85,7 @@ public class MQTTTest extends MQTTTestSupport {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
@Test
@ -1990,4 +1985,49 @@ public class MQTTTest extends MQTTTestSupport {
}
assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
}
@Test
public void testDoubleBroker() throws Exception {
/*
* Start two embedded server instances for MQTT and connect to them
* with the same MQTT client id. As those are two different instances
* connecting to them with the same client ID must succeed.
*/
final int port1 = 1884;
final int port2 = 1885;
final Configuration cfg1 = createDefaultConfig(1, false);
cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT");
final Configuration cfg2 = createDefaultConfig(2, false);
cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT");
final ActiveMQServer server1 = createServer(cfg1);
server1.start();
final ActiveMQServer server2 = createServer(cfg2);
server2.start();
final String clientId = "client1";
final MQTT mqtt1 = createMQTTConnection(clientId, true);
final MQTT mqtt2 = createMQTTConnection(clientId, true);
mqtt1.setHost("localhost", port1);
mqtt2.setHost("localhost", port2);
final BlockingConnection connection1 = mqtt1.blockingConnection();
final BlockingConnection connection2 = mqtt2.blockingConnection();
try {
connection1.connect();
connection2.connect();
} catch (Exception e) {
fail("Connections should have worked.");
} finally {
if (connection1.isConnected())
connection1.disconnect();
if (connection2.isConnected())
connection2.disconnect();
}
}
}

View File

@ -16,20 +16,14 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@ -61,20 +55,6 @@ public class MqttPluginTest extends MQTTTestSupport {
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
@Override
@Before
public void setUp() throws Exception {
Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
sessions.setAccessible(true);
sessions.set(null, new ConcurrentHashMap<>());
Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
connectedClients.setAccessible(true);
connectedClients.set(null, new ConcurrentHashSet<>());
super.setUp();
}
@Override
public void configureBroker() throws Exception {
super.configureBroker();