From 4484d05cf0d15c9b9388ea014b91d482b2f14a7b Mon Sep 17 00:00:00 2001 From: onlyMIT Date: Tue, 29 Jan 2019 17:58:36 +0800 Subject: [PATCH] ARTEMIS-2226 last consumer connection should close the previous consumer connection Multiple consumers using the same clientId in the cluster, the last consumer connection should close the previous consumer connection! ARTEMIS-2226 last consumer connection should close the previous consumer connection to address apache-rat-plugin:0.12:check ARTEMIS-2226 last consumer connection should close the previous consumer connection to address checkstyle ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure ARTEMIS-2226 last consumer connection should close the previous consumer connection add javadoc --- .../api/core/management/ManagementHelper.java | 4 + .../protocol/mqtt/MQTTConnectionManager.java | 9 +- .../protocol/mqtt/MQTTProtocolManager.java | 60 +- .../mqtt/MQTTProtocolManagerFactory.java | 6 +- .../core/protocol/mqtt/MQTTSession.java | 10 +- .../impl/ActiveMQServerControlImpl.java | 12 +- .../core/remoting/impl/AbstractAcceptor.java | 5 + .../cluster/impl/ClusterConnectionBridge.java | 16 +- .../cluster/impl/ClusterConnectionImpl.java | 17 + .../core/server/impl/ServerSessionImpl.java | 17 + .../NotificationActiveMQServerPlugin.java | 46 - .../management/NotificationTest.java | 1 - .../integration/mqtt/imported/MQTTTest.java | 12 +- .../mqtt/imported/MQTTTestSupport.java | 18 + .../MqttClusterRemoteSubscribeTest.java | 1070 +++++++++++++++-- 15 files changed, 1112 insertions(+), 191 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index bba8419022..53cb08716a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -84,6 +84,10 @@ public final class ManagementHelper { public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID"); + public static final SimpleString HDR_PROTOCOL_NAME = new SimpleString("_AMQ_Protocol_Name"); + + public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID"); + // Attributes ---------------------------------------------------- // Static -------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index bc511ea1dc..8efea0a9ae 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -162,14 +162,7 @@ public class MQTTConnectionManager { } private MQTTSessionState getSessionState(String clientId) { - /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ - MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); - if (state == null) { - state = new MQTTSessionState(clientId); - MQTTSession.SESSIONS.put(clientId, state); - } - - return state; + return session.getProtocolManager().getSessionState(clientId); } private String validateClientId(String clientId, boolean cleanSession) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 71d30d8492..6e914432ac 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -30,6 +30,9 @@ import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; @@ -40,11 +43,12 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.collections.TypedProperties; /** * MQTTProtocolManager */ -class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { +public class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { private static final List websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); @@ -55,18 +59,53 @@ class MQTTProtocolManager extends AbstractProtocolManager outgoingInterceptors = new ArrayList<>(); //TODO Read in a list of existing client IDs from stored Sessions. - private Map connectedClients = new ConcurrentHashMap<>(); + private final Map connectedClients; + private final Map sessionStates; MQTTProtocolManager(ActiveMQServer server, + Map connectedClients, + Map sessionStates, List incomingInterceptors, List outgoingInterceptors) { this.server = server; + this.connectedClients = connectedClients; + this.sessionStates = sessionStates; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); + server.getManagementService().addNotificationListener(this); } @Override public void onNotification(Notification notification) { - // TODO handle notifications + if (!(notification.getType() instanceof CoreNotificationType)) + return; + + CoreNotificationType type = (CoreNotificationType) notification.getType(); + if (type != CoreNotificationType.SESSION_CREATED) + return; + + TypedProperties props = notification.getProperties(); + + SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + + //Only process SESSION_CREATED notifications for the MQTT protocol + if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME)) + return; + + int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE); + + //distance > 0 means only processing notifications which are received from other nodes in the cluster + if (distance > 0) { + String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString(); + /* + * If there is a connection in the node with the same clientId as the value of the "_AMQ_Client_ID" attribute + * in the SESSION_CREATED notification, you need to close this connection. + * Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time. + */ + MQTTConnection mqttConnection = connectedClients.get(clientId); + if (mqttConnection != null) { + mqttConnection.destroy(); + } + } } @Override @@ -201,4 +240,17 @@ class MQTTProtocolManager extends AbstractProtocolManager getSessionStates() { + return new HashMap<>(sessionStates); + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 453b2671b1..74a29e66fb 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -37,12 +38,15 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory connectedClients = new ConcurrentHashMap<>(); + private final Map sessionStates = new ConcurrentHashMap<>(); + @Override public ProtocolManager createProtocolManager(ActiveMQServer server, final Map parameters, List incomingInterceptors, List outgoingInterceptors) throws Exception { - return BeanSupport.setData(new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors), parameters); + return BeanSupport.setData(new MQTTProtocolManager(server, connectedClients, sessionStates, incomingInterceptors, outgoingInterceptors), parameters); } @Override diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 640b893958..b788f36578 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -17,10 +17,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; @@ -30,8 +27,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; public class MQTTSession { - static Map SESSIONS = new ConcurrentHashMap<>(); - private final String id = UUID.randomUUID().toString(); private MQTTProtocolHandler protocolHandler; @@ -108,7 +103,7 @@ public class MQTTSession { if (isClean()) { clean(); - SESSIONS.remove(connection.getClientID()); + protocolManager.removeSessionState(connection.getClientID()); } } stopped = true; @@ -201,7 +196,4 @@ public class MQTTSession { return coreMessageObjectPools; } - public static Map getSessions() { - return new HashMap<>(SESSIONS); - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 318d880f0f..26b4ecaf36 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.DivertControl; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.client.impl.Topology; @@ -2967,7 +2968,16 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (!(notification.getType() instanceof CoreNotificationType)) return; CoreNotificationType type = (CoreNotificationType) notification.getType(); - TypedProperties prop = notification.getProperties(); + if (type == CoreNotificationType.SESSION_CREATED) { + TypedProperties props = notification.getProperties(); + /* + * If the SESSION_CREATED notification is received from another node in the cluster, no broadcast call is made. + * To keep the original logic to avoid calling the broadcast multiple times for the same SESSION_CREATED notification in the cluster. + */ + if (props.getIntProperty(ManagementHelper.HDR_DISTANCE) > 0) { + return; + } + } this.broadcaster.sendNotification(new Notification(type.toString(), this, notifSeq.incrementAndGet(), notification.toString())); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java index a2f30f3f62..1aa1dff754 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.remoting.impl; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,4 +44,8 @@ public abstract class AbstractAcceptor implements Acceptor { } } + public Map getProtocolMap() { + return Collections.unmodifiableMap(protocolMap); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index a9d80e570c..f7e281797c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -233,6 +233,8 @@ public class ClusterConnectionBridge extends BridgeImpl { " AND " + ManagementHelper.HDR_NOTIFICATION_TYPE + " IN ('" + + CoreNotificationType.SESSION_CREATED + + "','" + CoreNotificationType.BINDING_ADDED + "','" + CoreNotificationType.BINDING_REMOVED + @@ -252,6 +254,8 @@ public class ClusterConnectionBridge extends BridgeImpl { flowRecord.getMaxHops() + " AND (" + createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) + + ") AND (" + + createPermissiveManagementNotificationToFilter() + ")"); sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); @@ -351,10 +355,20 @@ public class ClusterConnectionBridge extends BridgeImpl { } filterString += "!" + storeAndForwardPrefix; filterString += ",!" + managementAddress; - filterString += ",!" + managementNotificationAddress; return filterString; } + /** + * Create a filter rule,in addition to SESSION_CREATED notifications, all other notifications using managementNotificationAddress + * as the routing address will be filtered. + * @return + */ + private String createPermissiveManagementNotificationToFilter() { + StringBuilder filterBuilder = new StringBuilder(ManagementHelper.HDR_NOTIFICATION_TYPE).append(" = '") + .append(CoreNotificationType.SESSION_CREATED).append("' OR (").append(ManagementHelper.HDR_ADDRESS) + .append(" NOT LIKE '").append(managementNotificationAddress).append("%')"); + return filterBuilder.toString(); + } @Override protected void nodeUP(TopologyMember member, boolean last) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 4b884b5c1c..aa68b81f90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1078,6 +1078,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn doUnProposalReceived(message); break; } + case SESSION_CREATED: { + doSessionCreated(message); + break; + } default: { throw ActiveMQMessageBundle.BUNDLE.invalidType(ntype); } @@ -1303,6 +1307,19 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn binding.disconnect(); } + private synchronized void doSessionCreated(final ClientMessage message) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace(ClusterConnectionImpl.this + " session created " + message); + } + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME)); + props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS)); + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID)); + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME)); + props.putIntProperty(ManagementHelper.HDR_DISTANCE, message.getIntProperty(ManagementHelper.HDR_DISTANCE) + 1); + managementService.sendNotification(new Notification(null, CoreNotificationType.SESSION_CREATED, props)); + } + private synchronized void doConsumerCreated(final ClientMessage message) throws Exception { if (logger.isTraceEnabled()) { logger.trace(ClusterConnectionImpl.this + " Consumer created " + message); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 6464eee2ab..04322df9a2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -272,6 +272,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (!xa) { tx = newTransaction(); } + //When the ServerSessionImpl initialization is complete, need to create and send a SESSION_CREATED notification. + sendSessionNotification(CoreNotificationType.SESSION_CREATED); } // ServerSession implementation --------------------------------------------------------------------------- @@ -422,6 +424,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } closed = true; + //When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification. + sendSessionNotification(CoreNotificationType.SESSION_CLOSED); if (server.hasBrokerSessionPlugins()) { server.callBrokerSessionPlugins(plugin -> plugin.afterCloseSession(this, failed)); @@ -429,6 +433,19 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } + private void sendSessionNotification(final CoreNotificationType type) throws Exception { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(this.getConnectionID().toString())); + props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(this.getUsername())); + props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(this.getName())); + + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(this.remotingConnection.getClientID())); + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, SimpleString.toSimpleString(this.remotingConnection.getProtocolName())); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, managementService.getManagementNotificationAddress()); + props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0); + managementService.sendNotification(new Notification(null, type, props)); + } + private void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception { if (securityEnabled) { securityStore.check(address, checkType, auth); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java index 29846aa0af..880f970a01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java @@ -26,7 +26,6 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; @@ -43,13 +42,11 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { private static final Logger logger = Logger.getLogger(NotificationActiveMQServerPlugin.class); public static final String SEND_CONNECTION_NOTIFICATIONS = "SEND_CONNECTION_NOTIFICATIONS"; - public static final String SEND_SESSION_NOTIFICATIONS = "SEND_SESSION_NOTIFICATIONS"; public static final String SEND_ADDRESS_NOTIFICATIONS = "SEND_ADDRESS_NOTIFICATIONS"; public static final String SEND_DELIVERED_NOTIFICATIONS = "SEND_DELIVERED_NOTIFICATIONS"; public static final String SEND_EXPIRED_NOTIFICATIONS = "SEND_EXPIRED_NOTIFICATIONS"; private boolean sendConnectionNotifications; - private boolean sendSessionNotifications; private boolean sendAddressNotifications; private boolean sendDeliveredNotifications; private boolean sendExpiredNotifications; @@ -66,8 +63,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { public void init(Map properties) { sendConnectionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS, Boolean.FALSE.toString())); - sendSessionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS, - Boolean.FALSE.toString())); sendAddressNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS, Boolean.FALSE.toString())); sendDeliveredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS, @@ -96,16 +91,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { sendConnectionNotification(connection, CoreNotificationType.CONNECTION_DESTROYED); } - @Override - public void afterCreateSession(ServerSession session) throws ActiveMQException { - sendSessionNotification(session, CoreNotificationType.SESSION_CREATED); - } - - @Override - public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { - sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED); - } - @Override public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED); @@ -196,23 +181,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { } } - private void sendSessionNotification(final ServerSession session, final CoreNotificationType type) { - final ManagementService managementService = getManagementService(); - - if (managementService != null && sendSessionNotifications) { - try { - final TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(session.getConnectionID().toString())); - props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(session.getUsername())); - props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName())); - - managementService.sendNotification(new Notification(null, type, props)); - } catch (Exception e) { - logger.warn("Error sending notification: " + type, e.getMessage(), e); - } - } - } - /** * @return the sendConnectionNotifications */ @@ -227,20 +195,6 @@ public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { this.sendConnectionNotifications = sendConnectionNotifications; } - /** - * @return the sendSessionNotifications - */ - public boolean isSendSessionNotifications() { - return sendSessionNotifications; - } - - /** - * @param sendSessionNotifications the sendSessionNotifications to set - */ - public void setSendSessionNotifications(boolean sendSessionNotifications) { - this.sendSessionNotifications = sendSessionNotifications; - } - /** * @return the sendDeliveredNotifications */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java index ed5713e293..196e939a2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java @@ -362,7 +362,6 @@ public class NotificationTest extends ActiveMQTestBase { NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin(); notificationPlugin.setSendAddressNotifications(true); notificationPlugin.setSendConnectionNotifications(true); - notificationPlugin.setSendSessionNotifications(true); notificationPlugin.setSendDeliveredNotifications(true); notificationPlugin.setSendExpiredNotifications(true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 03bcddd275..5b35f33be3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1120,7 +1120,7 @@ public class MQTTTest extends MQTTTestSupport { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST receive message from existing subscription from previous not clean session notClean = mqttNotClean.blockingConnection(); @@ -1132,7 +1132,7 @@ public class MQTTTest extends MQTTTestSupport { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session as existing subscription should be gone final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1144,7 +1144,7 @@ public class MQTTTest extends MQTTTestSupport { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); // MUST NOT receive message from previous clean session as existing subscription should be gone notClean = mqttNotClean.blockingConnection(); @@ -1153,7 +1153,7 @@ public class MQTTTest extends MQTTTestSupport { assertNull(msg); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); } @Test(timeout = 60 * 1000) @@ -1167,7 +1167,7 @@ public class MQTTTest extends MQTTTestSupport { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session even when creating a new subscription final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1179,7 +1179,7 @@ public class MQTTTest extends MQTTTestSupport { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); } @Test(timeout = 60 * 1000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index e49ec92ed6..83871e3210 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -28,6 +28,7 @@ import java.security.ProtectionDomain; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.fusesource.mqtt.client.MQTT; @@ -366,6 +372,18 @@ public class MQTTTestSupport extends ActiveMQTestBase { return mqtt; } + public Map getSessions() { + Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT"); + if (acceptor instanceof AbstractAcceptor) { + ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); + if (protocolManager instanceof MQTTProtocolManager) { + return ((MQTTProtocolManager) protocolManager).getSessionStates(); + } + + } + return Collections.emptyMap(); + } + private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception { MQTT mqtt = new MQTT(); mqtt.setConnectAttemptsMax(1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 8caba17d14..19360b16ae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -43,29 +44,127 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } @Test - public void unsubscribeRemoteQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + + subConnection2.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); BlockingConnection connection1 = null; BlockingConnection connection2 = null; try { - - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + connection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages @@ -73,9 +172,387 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + + Message message1 = connection1.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = connection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = connection1.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message21 = connection1.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + + String message11String = new String(message11.getPayload()); + String message21String = new String(message21.getPayload()); + String message31String = new String(message31.getPayload()); + assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String) ); + assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); + assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + + } + + @Test + public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + + subConnection2.subscribe(topics); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); + + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + + } + + @Test + public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + + subConnection2.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); @@ -90,11 +567,17 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { assertEquals(payload3, new String(message3.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -111,14 +594,13 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); - } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -127,82 +609,192 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } @Test - public void unsubscribeRemoteQueueWildCard() throws Exception { - final String TOPIC = "test/+/some/#"; + public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); startServers(0, 1); - BlockingConnection connection1 = null; - BlockingConnection connection2 = null; + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); - // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; - connection1.subscribe(topics); - connection2.subscribe(topics); + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + subConnection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); // Publish Messages String payload1 = "This is message 1"; String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); - - Message message1 = connection1.receive(5, TimeUnit.SECONDS); + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); message1.ack(); - Message message2 = connection2.receive(5, TimeUnit.SECONDS); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); message2.ack(); - Message message3 = connection1.receive(5, TimeUnit.SECONDS); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); message3.ack(); assertEquals(payload1, new String(message1.getPayload())); assertEquals(payload2, new String(message2.getPayload())); assertEquals(payload3, new String(message3.getPayload())); + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); - connection2.unsubscribe(new String[]{TOPIC}); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); - Message message21 = connection1.receive(5, TimeUnit.SECONDS); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + Message message31 = connection1.receive(5, TimeUnit.SECONDS); message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); - String message11String = new String(message11.getPayload()); - String message21String = new String(message21.getPayload()); - String message31String = new String(message31.getPayload()); - - assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String)); - assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String)); - assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -211,31 +803,39 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } @Test - public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; final String TOPIC2 = "sample"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); startServers(0, 1); BlockingConnection connection1 = null; BlockingConnection connection2 = null; try { - - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + //Waiting for resource initialization to complete + Thread.sleep(5000); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - connection1.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE)}); - connection2.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + connection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + connection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages String payload1 = "This is message 1"; @@ -243,9 +843,9 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); @@ -263,11 +863,17 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { assertEquals(payload3, new String(message3.getPayload())); assertEquals(payload4, new String(message4.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); @@ -289,12 +895,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } finally { - if (connection1 != null) { - connection1.unsubscribe(new String[]{TOPIC}); + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(new String[]{ANYCAST_TOPIC}); connection1.disconnect(); } - if (connection2 != null) { - connection2.unsubscribe(new String[]{TOPIC, TOPIC2}); + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); connection2.disconnect(); } } @@ -302,33 +908,36 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } @Test - public void unsubscribeExistingQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String TOPIC2 = "sample"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); startServers(0, 1); - BlockingConnection connection1 = null; - BlockingConnection connection2 = null; - BlockingConnection connection3 = null; + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); - connection3 = retrieveMQTTConnection("tcp://localhost:61617"); - // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; - connection1.subscribe(topics); - connection2.subscribe(topics); - connection3.subscribe(topics); + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + subConnection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 2, true); - - waitForBindings(0, TOPIC, 1, 2, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); // Publish Messages String payload1 = "This is message 1"; @@ -336,10 +945,232 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection2.receive(5, TimeUnit.SECONDS); + message4.ack(); + + String messageStr1 = new String(message1.getPayload()); + String messageStr2 = new String(message2.getPayload()); + String messageStr3 = new String(message3.getPayload()); + String messageStr4 = new String(message4.getPayload()); + assertTrue(payload1.equals(messageStr1) || payload1.equals(messageStr2) || payload1.equals(messageStr3) || payload1.equals(messageStr4)); + assertTrue(payload2.equals(messageStr1) || payload2.equals(messageStr2) || payload2.equals(messageStr3) || payload2.equals(messageStr4)); + assertTrue(payload3.equals(messageStr1) || payload3.equals(messageStr2) || payload3.equals(messageStr3) || payload3.equals(messageStr4)); + assertTrue(payload4.equals(messageStr1) || payload4.equals(messageStr2) || payload4.equals(messageStr3) || payload4.equals(messageStr4)); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + message11.ack(); + assertEquals(payload4, new String(message11.getPayload())); + + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + Message message41 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message41); + + } finally { + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(new String[]{ANYCAST_TOPIC}); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useSameClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection subConnection3 = null; + BlockingConnection pubConnection = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + + //Waiting for the first sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + + subConnection3 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + + //Waiting for the second sub connection be closed + assertTrue(waitConnectionClosed(subConnection1)); + + // Subscribe to topics + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + + subConnection3.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection3.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection3.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection3.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection3.receive(5, TimeUnit.SECONDS); + message4.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload4, new String(message4.getPayload())); + + subConnection3.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (subConnection3 != null && subConnection3.isConnected()) { + subConnection3.unsubscribe(topics); + subConnection3.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.unsubscribe(topics); + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + final String clientId3 = "clientId3"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + BlockingConnection connection3 = null; + try { + //Waiting for resource initialization to complete + Thread.sleep(5000); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3); + // Subscribe to topics + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection3.subscribe(topics); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 2, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 2, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); message1.ack(); @@ -355,12 +1186,17 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { assertEquals(payload3, new String(message3.getPayload())); assertEquals(payload4, new String(message4.getPayload())); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection2.unsubscribe(new String[]{TOPIC}); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -377,16 +1213,16 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } - if (connection3 != null) { + if (connection3 != null && connection3.isConnected()) { connection3.unsubscribe(topics); connection3.disconnect(); } @@ -395,9 +1231,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { } - private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); + mqtt.setClientId(clientId); + mqtt.setConnectAttemptsMax(0); + mqtt.setReconnectAttemptsMax(0); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); return connection; @@ -450,4 +1289,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { return wildcardConfiguration; } + private boolean waitConnectionClosed(BlockingConnection connection) throws Exception { + return Wait.waitFor(() -> !connection.isConnected()); + } }