From af2672e79a597e46af297a943f34bbb937a0c9b7 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 20 Dec 2022 11:47:18 -0600 Subject: [PATCH] ARTEMIS-966 MQTT subscription state isn't durable Durable subscrption state is part of the MQTT specification which has not been supported until now. This functionality is implemented via an internal last-value queue. When an MQTT client creates, updates, or adds a subscription a message using the client-ID as the last-value is sent to the internal queue. When the broker restarts this data is read from the queue and populates the in-memory MQTT data-structures. Therefore subscribers can reconnect and resume their session's subscriptions without have to manually resubscribe. MQTT state is now managed centrally per-broker rather than in the MQTTProtocolManager since there is one instance of MQTTProtocolManager for each acceptor allowing MQTT connections. Managing state per acceptor would allow odd behavior with clients connecting to different acceptors with the same client ID. The subscriptions are serialized as raw bytes with a "version" byte for potential future use, but I intentionally avoided adding complex scaffolding to support multiple versions. We can add that complexity later if necessary. Some tests needed to be changed since instantiating an MQTT protocol manager now creates an internal queue. A handful of tests assume that no queues will exist other than the ones they create themselves. I updated the main test super-class so that an MQTT protocol manager is not automatically instantiated when configuring a broker for in-vm support. --- .../activemq/artemis/utils/RandomUtil.java | 5 + .../core/message/impl/CoreMessage.java | 14 +- .../core/protocol/mqtt/MQTTBundle.java | 33 +++ .../protocol/mqtt/MQTTConnectionManager.java | 11 +- .../core/protocol/mqtt/MQTTLogger.java | 3 + .../protocol/mqtt/MQTTProtocolHandler.java | 18 +- .../protocol/mqtt/MQTTProtocolManager.java | 109 ++------ .../mqtt/MQTTProtocolManagerFactory.java | 2 +- .../core/protocol/mqtt/MQTTSession.java | 28 +- .../core/protocol/mqtt/MQTTSessionState.java | 101 +++++-- .../core/protocol/mqtt/MQTTStateManager.java | 246 ++++++++++++++++++ .../mqtt/MQTTSubscriptionManager.java | 108 ++++---- .../artemis/core/protocol/mqtt/MQTTUtil.java | 2 + .../core/protocol/mqtt/StateSerDeTest.java | 108 ++++++++ .../impl/invm/InVMAcceptorFactory.java | 5 + .../artemis/core/server/impl/QueueImpl.java | 2 + .../spi/core/remoting/AcceptorFactory.java | 4 + .../artemis/tests/util/ActiveMQTestBase.java | 3 + docs/user-manual/mqtt.adoc | 6 + .../resources/activemq/server0/broker.xml | 2 +- .../AmqpFailoverEndpointDiscoveryTest.java | 17 ++ .../AmqpReplicatedLargeMessageTest.java | 4 +- .../client/MessageExpirationTest.java | 1 + .../integration/client/UpdateQueueTest.java | 12 +- .../journal/MessageJournalTest.java | 2 +- .../tests/integration/mqtt/MQTTFQQNTest.java | 11 +- .../mqtt/MQTTSecurityManagerTest.java | 4 +- .../tests/integration/mqtt/MQTTTest.java | 4 +- .../integration/mqtt/MQTTTestSupport.java | 2 +- .../mqtt/MqttClusterRemoteSubscribeTest.java | 22 +- .../mqtt/MqttWildCardSubAutoCreateTest.java | 10 +- .../tests/integration/mqtt5/MQTT5Test.java | 118 +++++++++ .../integration/mqtt5/MQTT5TestSupport.java | 8 +- .../mqtt5/spec/MessageReceiptTests.java | 8 +- .../security/SecurityPerAcceptorJmsTest.java | 6 +- .../integration/security/SecurityTest.java | 4 +- 36 files changed, 814 insertions(+), 229 deletions(-) create mode 100644 artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java create mode 100644 artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java create mode 100644 artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java index 8ddd5f5871..54f44bd139 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java @@ -60,6 +60,11 @@ public class RandomUtil { return Math.abs(RandomUtil.randomInt()); } + public static Integer randomPositiveIntOrNull() { + Integer random = RandomUtil.randomInt(); + return random % 5 == 0 ? null : Math.abs(random); + } + public static ActiveMQBuffer randomBuffer(final int size, final long... data) { ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index a0c73d6979..8214e6d628 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1270,9 +1270,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public String toString() { try { final TypedProperties properties = getProperties(); - return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + - ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + - ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this); + return "CoreMessage[messageID=" + messageID + + ", durable=" + isDurable() + + ", userID=" + getUserID() + + ", priority=" + this.getPriority() + + ", timestamp=" + toDate(getTimestamp()) + + ", expiration=" + toDate(getExpiration()) + + ", durable=" + durable + + ", address=" + getAddress() + + ", size=" + getPersistentSize() + + ", properties=" + properties + + "]@" + System.identityHashCode(this); } catch (Throwable e) { logger.warn("Error creating String for message: ", e); return "ServerMessage[messageID=" + messageID + "]"; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java new file mode 100644 index 0000000000..0bbecb362e --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.mqtt; + +import org.apache.activemq.artemis.logs.BundleFactory; +import org.apache.activemq.artemis.logs.annotation.LogBundle; +import org.apache.activemq.artemis.logs.annotation.Message; + +/** + * Logger Code 85 + */ +@LogBundle(projectCode = "AMQ", regexID = "85[0-9]{4}") +public interface MQTTBundle { + + MQTTBundle BUNDLE = BundleFactory.newBundle(MQTTBundle.class); + + @Message(id = 850000, value = "Unable to store MQTT state within given timeout: {}ms") + IllegalStateException unableToStoreMqttState(long timeout); +} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 1cdcc149e4..eecc76166d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -67,7 +67,7 @@ public class MQTTConnectionManager { boolean cleanStart = connect.variableHeader().isCleanSession(); String clientId = session.getConnection().getClientID(); - boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId); + boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId); MQTTSessionState sessionState = getSessionState(clientId); synchronized (sessionState) { session.setSessionState(sessionState); @@ -120,6 +120,7 @@ public class MQTTConnectionManager { connackProperties = getConnackProperties(); } else { + sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval()); connackProperties = MqttProperties.NO_PROPERTIES; } @@ -193,15 +194,15 @@ public class MQTTConnectionManager { * ensure that the connection for the client ID matches *this* connection otherwise we could remove the * entry for the client who "stole" this client ID via [MQTT-3.1.4-2] */ - if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) { - session.getProtocolManager().removeConnectedClient(clientId); + if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) { + session.getStateManager().removeConnectedClient(clientId); } } } } } - private synchronized MQTTSessionState getSessionState(String clientId) { - return session.getProtocolManager().getSessionState(clientId); + private synchronized MQTTSessionState getSessionState(String clientId) throws Exception { + return session.getStateManager().getSessionState(clientId); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java index 6611482b41..834dadba2c 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java @@ -58,4 +58,7 @@ public interface MQTTLogger { @LogMessage(id = 834007, value = "Authorization failure sending will message: {}", level = LogMessage.Level.ERROR) void authorizationFailureSendingWillMessage(String message); + + @LogMessage(id = 834008, value = "Failed to remove session state for client with ID: {}", level = LogMessage.Level.ERROR) + void failedToRemoveSessionState(String clientID, Exception e); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 8ea0a5db66..3c5c139a0d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -44,8 +44,8 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException; +import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; @@ -57,6 +57,7 @@ import java.lang.invoke.MethodHandles; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the @@ -257,7 +258,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) { return; } else { - protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection()); + protocolManager.getStateManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection()); } if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) { @@ -377,7 +378,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleSubscribe(MqttSubscribeMessage message) throws Exception { - int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), message.idAndPropertiesVariableHeader().properties()); + Integer subscriptionIdentifier = MQTTUtil.getProperty(Integer.class, message.idAndPropertiesVariableHeader().properties(), SUBSCRIPTION_IDENTIFIER, null); + int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), subscriptionIdentifier); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(message.variableHeader().messageId(), MqttProperties.NO_PROPERTIES); MqttSubAckMessage subAck = new MqttSubAckMessage(header, variableHeader, new MqttSubAckPayload(qos)); @@ -385,7 +387,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { - short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); + short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics(), true); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage unsubAck; if (session.getVersion() == MQTTVersion.MQTT_5) { @@ -462,14 +464,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { * * However, this behavior is configurable via the "allowLinkStealing" acceptor URL property. */ - private LinkStealingResult handleLinkStealing() { + private LinkStealingResult handleLinkStealing() throws Exception { final String clientID = session.getConnection().getClientID(); LinkStealingResult result; - if (protocolManager.isClientConnected(clientID)) { - MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID); + if (protocolManager.getStateManager().isClientConnected(clientID)) { + MQTTConnection existingConnection = protocolManager.getStateManager().getConnectedClient(clientID); if (protocolManager.isAllowLinkStealing()) { - MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession(); + MQTTSession existingSession = protocolManager.getStateManager().getSessionState(clientID).getSession(); if (existingSession != null) { if (existingSession.getVersion() == MQTTVersion.MQTT_5) { existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index fef99a75a0..8d5d2d9e28 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,12 +16,10 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -36,6 +34,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; @@ -47,7 +46,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; public class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { @@ -60,9 +58,6 @@ public class MQTTProtocolManager extends AbstractProtocolManager incomingInterceptors = new ArrayList<>(); private final List outgoingInterceptors = new ArrayList<>(); - private final Map connectedClients = new ConcurrentHashMap<>(); - private final Map sessionStates = new ConcurrentHashMap<>(); - private int defaultMqttSessionExpiryInterval = -1; private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX; @@ -79,13 +74,23 @@ public class MQTTProtocolManager extends AbstractProtocolManager incomingInterceptors, - List outgoingInterceptors) { + List outgoingInterceptors) throws Exception { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); server.getManagementService().addNotificationListener(this); routingHandler = new MQTTRoutingHandler(server); + sessionStateManager = MQTTStateManager.getInstance(server); + server.registerActivateCallback(new CleaningActivateCallback() { + @Override + public void deActivate() { + MQTTStateManager.removeInstance(server); + sessionStateManager = null; + } + }); } public int getDefaultMqttSessionExpiryInterval() { @@ -176,7 +181,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager toRemove = new ArrayList(); - for (Map.Entry entry : sessionStates.entrySet()) { - MQTTSessionState state = entry.getValue(); - logger.debug("Inspecting session: {}", state); - int sessionExpiryInterval = getSessionExpiryInterval(state); - if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { - toRemove.add(entry.getKey()); - } - if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { - state.getSession().sendWillMessage(); - } - } - - for (String key : toRemove) { - logger.debug("Removing state for session: {}", key); - MQTTSessionState state = removeSessionState(key); - if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { - state.getSession().sendWillMessage(); - } - } - } - - private int getSessionExpiryInterval(MQTTSessionState state) { - int sessionExpiryInterval; - if (state.getClientSessionExpiryInterval() == 0) { - sessionExpiryInterval = getDefaultMqttSessionExpiryInterval(); - } else { - sessionExpiryInterval = state.getClientSessionExpiryInterval(); - } - return sessionExpiryInterval; - } - @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { try { @@ -348,56 +320,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager getSessionStates() { - return new HashMap<>(sessionStates); - } - - /** For DEBUG only */ - public Map getConnectedClients() { - return connectedClients; + public MQTTStateManager getStateManager() { + return sessionStateManager; } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 253ad66995..2264dbf3d5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -84,7 +84,7 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory { if (m instanceof MQTTProtocolManager) { - ((MQTTProtocolManager)m).scanSessions(); + ((MQTTProtocolManager)m).getStateManager().scanSessions(); } }); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index f6fa85c5de..ba70c45ccb 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -64,6 +64,8 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; + private MQTTStateManager stateManager; + private boolean clean; private WildcardConfiguration wildcardConfiguration; @@ -80,6 +82,7 @@ public class MQTTSession { WildcardConfiguration wildcardConfiguration) throws Exception { this.protocolHandler = protocolHandler; this.protocolManager = protocolManager; + this.stateManager = protocolManager.getStateManager(); this.wildcardConfiguration = wildcardConfiguration; this.connection = connection; @@ -87,7 +90,7 @@ public class MQTTSession { mqttConnectionManager = new MQTTConnectionManager(this); mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure()); sessionCallback = new MQTTSessionCallback(this, connection); - subscriptionManager = new MQTTSubscriptionManager(this); + subscriptionManager = new MQTTSubscriptionManager(this, stateManager); retainMessageManager = new MQTTRetainMessageManager(this); state = MQTTSessionState.DEFAULT; @@ -120,11 +123,9 @@ public class MQTTSession { internalServerSession.close(false); } - if (state != null) { - state.setAttached(false); - state.setDisconnectedTime(System.currentTimeMillis()); - state.clearTopicAliases(); - } + state.setAttached(false); + state.setDisconnectedTime(System.currentTimeMillis()); + state.clearTopicAliases(); if (getVersion() == MQTTVersion.MQTT_5) { if (state.getClientSessionExpiryInterval() == 0) { @@ -133,9 +134,7 @@ public class MQTTSession { sendWillMessage(); } clean(false); - protocolManager.removeSessionState(connection.getClientID()); - } else { - state.setDisconnectedTime(System.currentTimeMillis()); + stateManager.removeSessionState(connection.getClientID()); } } else { if (state.isWill() && failure) { @@ -143,7 +142,7 @@ public class MQTTSession { } if (isClean()) { clean(false); - protocolManager.removeSessionState(connection.getClientID()); + stateManager.removeSessionState(connection.getClientID()); } } } @@ -226,6 +225,10 @@ public class MQTTSession { return protocolManager; } + MQTTStateManager getStateManager() { + return stateManager; + } + void clean(boolean enforceSecurity) throws Exception { subscriptionManager.clean(enforceSecurity); mqttPublishManager.clean(); @@ -290,6 +293,9 @@ public class MQTTSession { @Override public String toString() { - return "MQTTSession[coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") + "]"; + return "MQTTSession[" + + "coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") + + ", clientId: " + state.getClientId() + + "]"; } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 33e6f159ed..5c63f1c152 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -30,9 +30,14 @@ import java.util.regex.Pattern; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.settings.impl.Match; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +47,13 @@ public class MQTTSessionState { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final MQTTSessionState DEFAULT = new MQTTSessionState(null); + public static final MQTTSessionState DEFAULT = new MQTTSessionState((String) null, null); private MQTTSession session; - private String clientId; + private final String clientId; + + private final MQTTStateManager stateManager; private final ConcurrentMap> subscriptions = new ConcurrentHashMap<>(); @@ -91,8 +98,50 @@ public class MQTTSessionState { private Map serverTopicAliases; - public MQTTSessionState(String clientId) { + public MQTTSessionState(String clientId, MQTTStateManager stateManager) { this.clientId = clientId; + this.stateManager = stateManager; + } + + /** + * This constructor deserializes session data from a message. The format is as follows. + * + * - byte: version + * - int: subscription count + * + * There may be 0 or more subscriptions. The subscription format is as follows. + * + * - String: topic name + * - int: QoS + * - boolean: no-local + * - boolean: retain as published + * - int: retain handling + * - int (nullable): subscription identifier + * + * @param message the message holding the MQTT session data + * @param stateManager the manager used to add and remove sessions from storage + */ + public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager) { + logger.debug("Deserializing MQTT session state from {}", message); + this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME); + this.stateManager = stateManager; + ActiveMQBuffer buf = message.getDataBuffer(); + + // no need to use the version at this point + byte version = buf.readByte(); + + int subscriptionCount = buf.readInt(); + logger.debug("Deserializing {} subscriptions", subscriptionCount); + for (int i = 0; i < subscriptionCount; i++) { + String topicName = buf.readString(); + MqttQoS qos = MqttQoS.valueOf(buf.readInt()); + boolean nolocal = buf.readBoolean(); + boolean retainAsPublished = buf.readBoolean(); + MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy = MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(buf.readInt()); + Integer subscriptionId = buf.readNullableInt(); + + subscriptions.put(topicName, new Pair<>(new MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal, retainAsPublished, retainedHandlingPolicy)), subscriptionId)); + } } public MQTTSession getSession() { @@ -103,7 +152,7 @@ public class MQTTSessionState { this.session = session; } - public synchronized void clear() { + public synchronized void clear() throws Exception { subscriptions.clear(); messageRefStore.clear(); addressMessageMap.clear(); @@ -148,7 +197,11 @@ public class MQTTSessionState { return result; } - public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) { + public Collection> getSubscriptionsPlusID() { + return subscriptions.values(); + } + + public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) throws Exception { // synchronized to prevent race with removeSubscription synchronized (subscriptions) { addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCoreAddress(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<>()); @@ -172,7 +225,7 @@ public class MQTTSessionState { } } - public void removeSubscription(String address) { + public void removeSubscription(String address) throws Exception { // synchronized to prevent race with addSubscription synchronized (subscriptions) { subscriptions.remove(address); @@ -184,6 +237,10 @@ public class MQTTSessionState { return subscriptions.get(address) != null ? subscriptions.get(address).getA() : null; } + public Pair getSubscriptionPlusID(String address) { + return subscriptions.get(address) != null ? subscriptions.get(address) : null; + } + public List getMatchingSubscriptionIdentifiers(String address) { address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, session.getServer().getConfiguration().getWildcardConfiguration()); List result = null; @@ -207,10 +264,6 @@ public class MQTTSessionState { return clientId; } - public void setClientId(String clientId) { - this.clientId = clientId; - } - public long getDisconnectedTime() { return disconnectedTime; } @@ -372,6 +425,29 @@ public class MQTTSessionState { } } + @Override + public String toString() { + return "MQTTSessionState[session=" + session + + ", clientId=" + clientId + + ", subscriptions=" + subscriptions + + ", messageRefStore=" + messageRefStore + + ", addressMessageMap=" + addressMessageMap + + ", pubRec=" + pubRec + + ", attached=" + attached + + ", outboundStore=" + outboundStore + + ", disconnectedTime=" + disconnectedTime + + ", sessionExpiryInterval=" + clientSessionExpiryInterval + + ", isWill=" + isWill + + ", willMessage=" + willMessage + + ", willTopic=" + willTopic + + ", willQoSLevel=" + willQoSLevel + + ", willRetain=" + willRetain + + ", willDelayInterval=" + willDelayInterval + + ", failed=" + failed + + ", maxPacketSize=" + clientMaxPacketSize + + "]@" + System.identityHashCode(this); + } + public class OutboundStore { private HashMap, Integer> artemisToMqttMessageMap = new HashMap<>(); @@ -445,11 +521,6 @@ public class MQTTSessionState { } } - @Override - public String toString() { - return "MQTTSessionState[" + "session=" + session + ", clientId='" + clientId + "', subscriptions=" + subscriptions + ", messageRefStore=" + messageRefStore + ", addressMessageMap=" + addressMessageMap + ", pubRec=" + pubRec + ", attached=" + attached + ", outboundStore=" + outboundStore + ", disconnectedTime=" + disconnectedTime + ", sessionExpiryInterval=" + clientSessionExpiryInterval + ", isWill=" + isWill + ", willMessage=" + willMessage + ", willTopic='" + willTopic + "', willQoSLevel=" + willQoSLevel + ", willRetain=" + willRetain + ", willDelayInterval=" + willDelayInterval + ", failed=" + failed + ", maxPacketSize=" + clientMaxPacketSize + ']'; - } - public enum WillStatus { NOT_SENT, SENT, SENDING; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java new file mode 100644 index 0000000000..aa913b8f3a --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTStateManager { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ActiveMQServer server; + private final Map sessionStates = new ConcurrentHashMap<>(); + private final Queue sessionStore; + private static Map INSTANCES = new HashMap<>(); + private final Map connectedClients = new ConcurrentHashMap<>(); + + /* + * Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want + * one instance of MQTTSessionStateManager per-broker with the understanding that there can be multiple brokers in + * the same JVM. + */ + public static synchronized MQTTStateManager getInstance(ActiveMQServer server) throws Exception { + MQTTStateManager instance = INSTANCES.get(System.identityHashCode(server)); + if (instance == null) { + instance = new MQTTStateManager(server); + INSTANCES.put(System.identityHashCode(server), instance); + } + + return instance; + } + + public static synchronized void removeInstance(ActiveMQServer server) { + INSTANCES.remove(System.identityHashCode(server)); + } + + private MQTTStateManager(ActiveMQServer server) throws Exception { + this.server = server; + sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true); + + // load session data from queue + try (LinkedListIterator iterator = sessionStore.browserIterator()) { + try { + while (iterator.hasNext()) { + MessageReference ref = iterator.next(); + String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME); + MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this); + sessionStates.put(clientId, sessionState); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing + } + } + } + + public void scanSessions() { + List toRemove = new ArrayList(); + for (Map.Entry entry : sessionStates.entrySet()) { + MQTTSessionState state = entry.getValue(); + logger.debug("Inspecting session: {}", state); + int sessionExpiryInterval = state.getClientSessionExpiryInterval(); + if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { + toRemove.add(entry.getKey()); + } + if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { + state.getSession().sendWillMessage(); + } + } + + for (String key : toRemove) { + try { + MQTTSessionState state = removeSessionState(key); + if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { + state.getSession().sendWillMessage(); + } + } catch (Exception e) { + MQTTLogger.LOGGER.failedToRemoveSessionState(key, e); + } + } + } + + public MQTTSessionState getSessionState(String clientId) throws Exception { + /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ + if (sessionStates.containsKey(clientId)) { + return sessionStates.get(clientId); + } else { + MQTTSessionState sessionState = new MQTTSessionState(clientId, this); + logger.debug("Adding MQTT session state for: {}", clientId); + sessionStates.put(clientId, sessionState); + storeSessionState(sessionState); + return sessionState; + } + } + + public MQTTSessionState removeSessionState(String clientId) throws Exception { + logger.debug("Removing MQTT session state for: {}", clientId); + if (clientId == null) { + return null; + } + removeDurableSessionState(clientId); + return sessionStates.remove(clientId); + } + + public void removeDurableSessionState(String clientId) throws Exception { + int deletedCount = sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" = '").append(clientId).append("'").toString())); + logger.debug("Removed {} durable MQTT state records for: {}", deletedCount, clientId); + } + + public Map getSessionStates() { + return new HashMap<>(sessionStates); + } + + @Override + public String toString() { + return "MQTTSessionStateManager@" + Integer.toHexString(System.identityHashCode(this)); + } + + public void storeSessionState(MQTTSessionState state) throws Exception { + logger.debug("Adding durable MQTT state record for: {}", state.getClientId()); + + /* + * It is imperative to ensure the routed message is actually *all the way* on the queue before proceeding + * otherwise there can be a race with removing it. + */ + CountDownLatch latch = new CountDownLatch(1); + Transaction tx = new TransactionImpl(server.getStorageManager()); + server.getPostOffice().route(serializeState(state, server.getStorageManager().generateID()), tx, false); + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + latch.countDown(); + } + }); + tx.commit(); + final long timeout = 5000; + if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { + throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout); + } + } + + public static CoreMessage serializeState(MQTTSessionState state, long messageID) { + CoreMessage message = new CoreMessage().initBuffer(50).setMessageID(messageID); + message.setAddress(MQTTUtil.MQTT_SESSION_STORE); + message.setDurable(true); + message.putStringProperty(Message.HDR_LAST_VALUE_NAME, state.getClientId()); + Collection> subscriptions = state.getSubscriptionsPlusID(); + ActiveMQBuffer buf = message.getBodyBuffer(); + + /* + * This byte represents the "version". If the payload changes at any point in the future then we can detect that + * and adjust so that when users are upgrading we can still read the old data format. + */ + buf.writeByte((byte) 0); + + buf.writeInt(subscriptions.size()); + logger.debug("Serializing {} subscriptions", subscriptions.size()); + for (Pair pair : subscriptions) { + MqttTopicSubscription sub = pair.getA(); + buf.writeString(sub.topicName()); + buf.writeInt(sub.option().qos().value()); + buf.writeBoolean(sub.option().isNoLocal()); + buf.writeBoolean(sub.option().isRetainAsPublished()); + buf.writeInt(sub.option().retainHandling().value()); + buf.writeNullableInt(pair.getB()); + } + + return message; + } + + public boolean isClientConnected(String clientId, MQTTConnection connection) { + MQTTConnection connectedConn = connectedClients.get(clientId); + + if (connectedConn != null) { + return connectedConn.equals(connection); + } + + return false; + } + + public boolean isClientConnected(String clientId) { + return connectedClients.containsKey(clientId); + } + + public void removeConnectedClient(String clientId) { + connectedClients.remove(clientId); + } + + /** + * @param clientId + * @param connection + * @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for + * the {@code clientId} + */ + public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { + return connectedClients.put(clientId, connection); + } + + public MQTTConnection getConnectedClient(String clientId) { + return connectedClients.get(clientId); + } + + /** For DEBUG only */ + public Map getConnectedClients() { + return connectedClients; + } +} \ No newline at end of file diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 176a1c18ca..dedacd55e9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -38,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; -import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH; import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING; @@ -47,6 +46,8 @@ public class MQTTSubscriptionManager { private final MQTTSession session; + private final MQTTStateManager stateManager; + private final ConcurrentMap consumerQoSLevels; private final ConcurrentMap consumers; @@ -66,8 +67,9 @@ public class MQTTSubscriptionManager { private final char anyWords; - public MQTTSubscriptionManager(MQTTSession session) { + public MQTTSubscriptionManager(MQTTSession session, MQTTStateManager stateManager) { this.session = session; + this.stateManager = stateManager; singleWord = session.getServer().getConfiguration().getWildcardConfiguration().getSingleWord(); anyWords = session.getServer().getConfiguration().getWildcardConfiguration().getAnyWords(); @@ -130,7 +132,7 @@ public class MQTTSubscriptionManager { session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier); } } catch (Exception e) { - // if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue is removed + // if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue q.deleteQueue(); throw e; } @@ -246,56 +248,52 @@ public class MQTTSubscriptionManager { consumerQoSLevels.put(cid, qos); } - short[] removeSubscriptions(List topics) throws Exception { + short[] removeSubscriptions(List topics, boolean enforceSecurity) throws Exception { short[] reasonCodes; + MQTTSessionState state = session.getState(); - synchronized (session.getState()) { + synchronized (state) { reasonCodes = new short[topics.size()]; for (int i = 0; i < topics.size(); i++) { - reasonCodes[i] = removeSubscription(topics.get(i)); + if (session.getState().getSubscription(topics.get(i)) == null) { + reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED; + continue; + } + + short reasonCode = MQTTReasonCodes.SUCCESS; + + try { + session.getState().removeSubscription(topics.get(i)); + ServerConsumer removed = consumers.remove(parseTopicName(topics.get(i))); + if (removed != null) { + removed.close(false); + consumerQoSLevels.remove(removed.getID()); + } + + SimpleString internalQueueName = getQueueNameForTopic(topics.get(i)); + Queue queue = session.getServer().locateQueue(internalQueueName); + if (queue != null) { + if (queue.isConfigurationManaged()) { + queue.deleteAllReferences(); + } else if (!topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) { + session.getServerSession().deleteQueue(internalQueueName, enforceSecurity); + } + } + } catch (Exception e) { + MQTTLogger.LOGGER.errorRemovingSubscription(e); + reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR; + } + + reasonCodes[i] = reasonCode; } + + // store state after *all* requested subscriptions have been removed in memory + stateManager.storeSessionState(state); } return reasonCodes; } - private short removeSubscription(String address) { - return removeSubscription(address, true); - } - - private short removeSubscription(String topic, boolean enforceSecurity) { - if (session.getState().getSubscription(topic) == null) { - return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED; - } - - short reasonCode = MQTTReasonCodes.SUCCESS; - - try { - session.getState().removeSubscription(topic); - - ServerConsumer removed = consumers.remove(parseTopicName(topic)); - if (removed != null) { - removed.close(false); - consumerQoSLevels.remove(removed.getID()); - } - - SimpleString internalQueueName = getQueueNameForTopic(topic); - Queue queue = session.getServer().locateQueue(internalQueueName); - if (queue != null) { - if (queue.isConfigurationManaged()) { - queue.deleteAllReferences(); - } else if (!topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) { - session.getServerSession().deleteQueue(internalQueueName, enforceSecurity); - } - } - } catch (Exception e) { - MQTTLogger.LOGGER.errorRemovingSubscription(e); - reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR; - } - - return reasonCode; - } - private SimpleString getQueueNameForTopic(String topic) { if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { int slashIndex = topic.indexOf(SLASH) + 1; @@ -308,19 +306,15 @@ public class MQTTSubscriptionManager { } /** - * As per MQTT Spec. Subscribes this client to a number of MQTT topics. + * As per MQTT Spec. Subscribes this client to a number of MQTT topics. * * @param subscriptions * @return An array of integers representing the list of accepted QoS for each topic. * @throws Exception */ - int[] addSubscriptions(List subscriptions, MqttProperties properties) throws Exception { - synchronized (session.getState()) { - Integer subscriptionIdentifier = null; - if (properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()) != null) { - subscriptionIdentifier = (Integer) properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()).value(); - } - + int[] addSubscriptions(List subscriptions, Integer subscriptionIdentifier) throws Exception { + MQTTSessionState state = session.getState(); + synchronized (state) { int[] qos = new int[subscriptions.size()]; for (int i = 0; i < subscriptions.size(); i++) { @@ -354,6 +348,10 @@ public class MQTTSubscriptionManager { } } } + + // store state after *all* requested subscriptions have been created in memory + stateManager.storeSessionState(state); + return qos; } } @@ -362,9 +360,11 @@ public class MQTTSubscriptionManager { return consumerQoSLevels; } - void clean(boolean enforceSecurity) { + void clean(boolean enforceSecurity) throws Exception { + List topics = new ArrayList<>(); for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) { - removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity); + topics.add(mqttTopicSubscription.topicName()); } + removeSubscriptions(topics, enforceSecurity); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 1f77de5c1b..4b59ea2489 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -86,6 +86,8 @@ public class MQTTUtil { public static final char SLASH = '/'; + public static final String MQTT_SESSION_STORE = DOLLAR + "sys.mqtt.sessions"; + public static final String MQTT_RETAIN_ADDRESS_PREFIX = DOLLAR + "sys.mqtt.retain."; public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level"); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java new file mode 100644 index 0000000000..11d2f025bf --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StateSerDeTest { + + @Test(timeout = 30000) + public void testSerDe() throws Exception { + for (int i = 0; i < 500; i++) { + String clientId = RandomUtil.randomString(); + MQTTSessionState unserialized = new MQTTSessionState(clientId, null); + Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull(); + for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) { + MqttTopicSubscription sub = new MqttTopicSubscription(RandomUtil.randomString(), + new MqttSubscriptionOption(MqttQoS.valueOf(RandomUtil.randomInterval(0, 3)), + RandomUtil.randomBoolean(), + RandomUtil.randomBoolean(), + MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(RandomUtil.randomInterval(0, 3)))); + unserialized.addSubscription(sub, MQTTUtil.MQTT_WILDCARD, subscriptionIdentifier); + } + + CoreMessage serializedState = MQTTStateManager.serializeState(unserialized, 0); + MQTTSessionState deserialized = new MQTTSessionState(serializedState, null); + + assertEquals(unserialized.getClientId(), deserialized.getClientId()); + for (Pair unserializedEntry : unserialized.getSubscriptionsPlusID()) { + MqttTopicSubscription unserializedSub = unserializedEntry.getA(); + Integer unserializedSubId = unserializedEntry.getB(); + Pair deserializedEntry = deserialized.getSubscriptionPlusID(unserializedSub.topicName()); + MqttTopicSubscription deserializedSub = deserializedEntry.getA(); + Integer deserializedSubId = deserializedEntry.getB(); + + assertTrue(compareSubs(unserializedSub, deserializedSub)); + assertEquals(unserializedSubId, deserializedSubId); + } + } + } + + private boolean compareSubs(MqttTopicSubscription a, MqttTopicSubscription b) { + if (a == b) { + return true; + } + if (a == null || b == null) { + return false; + } + if (a.topicName() == null) { + if (b.topicName() != null) { + return false; + } + } else if (!a.topicName().equals(b.topicName())) { + return false; + } + if (a.option() == null) { + if (b.option() != null) { + return false; + } + } else { + if (a.option().qos() == null) { + if (b.option().qos() != null) { + return false; + } + } else if (a.option().qos().value() != b.option().qos().value()) { + return false; + } + if (a.option().retainHandling() == null) { + if (b.option().retainHandling() != null) { + return false; + } + } else if (a.option().retainHandling().value() != b.option().retainHandling().value()) { + return false; + } + if (a.option().isRetainAsPublished() != b.option().isRetainAsPublished()) { + return false; + } + if (a.option().isNoLocal() != b.option().isNoLocal()) { + return false; + } + } + + return true; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java index 883b116ec7..09592b43f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java @@ -40,4 +40,9 @@ public class InVMAcceptorFactory implements AcceptorFactory { final Map protocolMap) { return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, protocolMap, threadPool); } + + @Override + public boolean supportsRemote() { + return false; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 9f948355d9..eeacf86c8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2226,6 +2226,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Transaction tx = new TransactionImpl(storageManager); synchronized (this) { + // ensure all messages are moved from intermediateMessageReferences so that they can be seen by the iterator + doInternalPoll(); try (LinkedListIterator iter = iterator()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java index 4390d4f0c5..2416e17efc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java @@ -51,4 +51,8 @@ public interface AcceptorFactory { ScheduledExecutorService scheduledThreadPool, Map protocolMap); + default boolean supportsRemote() { + return true; + } + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 6e32ca69e1..2e13383dc5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -585,6 +585,9 @@ public abstract class ActiveMQTestBase extends Assert { if (netty) { configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap(), "netty", new HashMap())); + } else { + // if we're in-vm it's a waste to resolve protocols since they'll never be used + configuration.setResolveProtocols(false); } return configuration; diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc index 106af544e8..d460279bd8 100644 --- a/docs/user-manual/mqtt.adoc +++ b/docs/user-manual/mqtt.adoc @@ -89,6 +89,12 @@ As far as the broker is concerned a payload is just an array of bytes. However, to facilitate logging the broker will encode the payloads as UTF-8 strings and print them up to 256 characters. Payload logging is limited to avoid filling the logs with potentially hundreds of megabytes of unhelpful information. +== Persistent Subscriptions + +The subscription information for MQTT sessions is stored in an internal queue named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is enabled). +The information is durable so that MQTT subscribers can reconnect and resume their subscriptions seamlessly after a broker restart, failure, etc. +When brokers are configured for high availability this information will be available on the backup so even in the case of a broker fail-over subscribers will be able to resume their subscriptions. + == Custom Client ID Handling The client ID used by an MQTT application is very important as it uniquely identifies the application. diff --git a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml index a6fcc0b7d7..020338df2b 100644 --- a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml @@ -45,7 +45,7 @@ under the License. - + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java index c135dc5089..bf0580496b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java @@ -28,6 +28,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; import org.apache.qpid.jms.JmsConnectionFactory; @@ -58,6 +60,21 @@ public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase { this.protocol = protocol; } + @Override + protected void createConfigs() throws Exception { + nodeManager = createNodeManager(); + TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + + backupConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName())); + + backupServer = createTestableServer(backupConfig); + + liveConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); + + liveServer = createTestableServer(liveConfig); + } + @Override protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { return getNettyAcceptorTransportConfig(live); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java index 53263d3772..feed08107d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java @@ -59,8 +59,8 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport { super.setUp(); createReplicatedConfigs(); - liveConfig.addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); - backupConfig.addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); + liveConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); + backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); liveServer.start(); backupServer.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java index 4e75fa3df9..fb3be03d54 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java @@ -288,6 +288,7 @@ public class MessageExpirationTest extends ActiveMQTestBase { server = createServer(true); server.getConfiguration().addAcceptorConfiguration("amqp", "tcp://127.0.0.1:61616"); + server.getConfiguration().setResolveProtocols(true); server.getConfiguration().setMessageExpiryScanPeriod(200); server.start(); locator = createInVMNonHALocator(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java index 657676dc51..558023a64d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java @@ -40,9 +40,9 @@ public class UpdateQueueTest extends ActiveMQTestBase { @Test public void testUpdateQueueWithNullUser() throws Exception { - ActiveMQServer server = createServer(true, true); + ActiveMQServer server = createServer(true, false); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0"); server.start(); @@ -80,7 +80,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { Assert.assertEquals("newUser", user, queue.getUser()); - factory = new ActiveMQConnectionFactory(); + factory = new ActiveMQConnectionFactory("vm://0"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -110,9 +110,9 @@ public class UpdateQueueTest extends ActiveMQTestBase { @Test public void testUpdateQueue() throws Exception { - ActiveMQServer server = createServer(true, true); + ActiveMQServer server = createServer(true, false); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0"); server.start(); @@ -169,7 +169,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { Assert.assertEquals("newUser", queue.getUser().toString()); Assert.assertEquals(180L, queue.getRingSize()); - factory = new ActiveMQConnectionFactory(); + factory = new ActiveMQConnectionFactory("vm://0"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java index 03bed28ffc..72b6474bb5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java @@ -86,7 +86,7 @@ public class MessageJournalTest extends ActiveMQTestBase { @Test public void testStoreAMQP() throws Throwable { - ActiveMQServer server = createServer(true); + ActiveMQServer server = createServer(true, true); server.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java index a2c7bde455..7fda8a1b2b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.junit.Test; @@ -34,8 +35,9 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - assertEquals(1, server.getPostOffice().getAllBindings().count()); - Binding b = server.getPostOffice().getAllBindings().iterator().next(); + Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah")); + assertEquals(1, bindings.size()); + Binding b = bindings.getBindings().iterator().next(); //check that query using bare queue name works as before QueueQueryResult result = server.queueQuery(b.getUniqueName()); assertTrue(result.isExists()); @@ -126,8 +128,9 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - assertEquals(1, server.getPostOffice().getAllBindings().count()); - Binding b = server.getPostOffice().getAllBindings().iterator().next(); + Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah")); + assertEquals(1, bindings.size()); + Binding b = bindings.getBindings().iterator().next(); //check ::queue QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java index f55876101b..802af06186 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java @@ -100,7 +100,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates(); + sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } assertEquals(1, sessionStates.size()); @@ -132,7 +132,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates(); + sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } assertEquals(1, sessionStates.size()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java index e332da72a7..1a5e673a01 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java @@ -2099,10 +2099,10 @@ public class MQTTTest extends MQTTTestSupport { final int port2 = 1885; final Configuration cfg1 = createDefaultConfig(1, false); - cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT"); + cfg1.setResolveProtocols(true).addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT"); final Configuration cfg2 = createDefaultConfig(2, false); - cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT"); + cfg2.setResolveProtocols(true).addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT"); final ActiveMQServer server1 = createServer(cfg1); server1.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java index 9f8bcd0d0a..491c065fe3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java @@ -387,7 +387,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - return ((MQTTProtocolManager) protocolManager).getSessionStates(); + return ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java index 4f7d35b1d8..04eaf7a88f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java @@ -68,14 +68,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; subConnection2.subscribe(topics); @@ -258,14 +258,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; @@ -456,14 +456,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; subConnection2.subscribe(topics); @@ -622,7 +622,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -711,9 +711,9 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); // Subscribe to topics connection1.subscribe(topics); @@ -924,7 +924,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -1029,7 +1029,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(2, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); //Waiting for the first sub connection be closed diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java index 32de7c147c..8a6341a8b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -221,7 +222,14 @@ public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport { messageConsumer.close(); messageConsumerAllNews.close(); - int countOfPageStores = server.getPagingManager().getStoreNames().length; + int countOfPageStores = 0; + SimpleString[] storeNames = server.getPagingManager().getStoreNames(); + for (int i = 0; i < storeNames.length; i++) { + if (!storeNames[i].equals(SimpleString.toSimpleString(MQTTUtil.MQTT_SESSION_STORE))) { + countOfPageStores++; + } + } + assertEquals("there should be 5", 5, countOfPageStores); connection.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 00e3a95a4b..e806d2a9e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -45,6 +45,7 @@ import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.UserProperty; import org.junit.Test; @@ -82,6 +83,69 @@ public class MQTT5Test extends MQTT5TestSupport { context.close(); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testResumeSubscriptionsAfterRestart() throws Exception { + final int SUBSCRIPTION_COUNT = 100; + List topicNames = new ArrayList<>(SUBSCRIPTION_COUNT); + for (int i = 0; i < SUBSCRIPTION_COUNT; i++) { + topicNames.add(getName() + i); + } + + CountDownLatch latch = new CountDownLatch(SUBSCRIPTION_COUNT); + MqttClient consumer = createPahoClient("myConsumerID"); + MqttConnectionOptions consumerOptions = new MqttConnectionOptionsBuilder() + .cleanStart(false) + .sessionExpiryInterval(999L) + .build(); + consumer.connect(consumerOptions); + List subs = new ArrayList<>(SUBSCRIPTION_COUNT); + for (String subName : topicNames) { + subs.add(new MqttSubscription(subName, 1)); + } + consumer.subscribe(subs.toArray(new MqttSubscription[0])); + consumer.disconnect(); + + MqttClient producer = createPahoClient("myProducerID"); + MqttConnectionOptions producerOptions = new MqttConnectionOptionsBuilder() + .sessionExpiryInterval(0L) + .build(); + producer.connect(producerOptions); + for (String subName : topicNames) { + producer.publish(subName, new byte[0], 1, false); + } + producer.disconnect(); + producer.close(); + + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100); + + server.stop(); + server.start(); + + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100); + Wait.assertTrue(() -> getSessionStates().get("myConsumerID") != null, 2000, 100); + consumer.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + if (topicNames.remove(topic)) { + latch.countDown(); + } + } + }); + consumerOptions = new MqttConnectionOptionsBuilder() + .cleanStart(false) + .sessionExpiryInterval(0L) + .build(); + consumer.connect(consumerOptions); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + consumer.unsubscribe(topicNames.toArray(new String[0])); + consumer.disconnect(); + consumer.close(); + Wait.assertEquals(0L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 5000, 100); + } + + /* + * Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184 + */ @Test(timeout = DEFAULT_TIMEOUT) public void testAddressAutoCreation() throws Exception { final String DESTINATION = RandomUtil.randomString(); @@ -470,6 +534,60 @@ public class MQTT5Test extends MQTT5TestSupport { client.close(); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingOnMultipleAcceptors() throws Exception { + int secondaryPort = 1884; + final String CLIENT_ID = RandomUtil.randomString(); + + server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort); + server.getRemotingService().startAcceptors(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + + MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort); + client2.connect(); + + // only 1 session should exist + Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertFalse(client.isConnected()); + + client.close(); + client2.disconnect(); + client2.close(); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingDisabledOnMultipleAcceptors() throws Exception { + int secondaryPort = 1884; + final String CLIENT_ID = RandomUtil.randomString(); + + server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort + "?allowLinkStealing=false"); + server.getRemotingService().startAcceptors(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + + MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort); + try { + client2.connect(); + fail("Should have thrown an exception on connect due to disabled link stealing"); + } catch (Exception e) { + // ignore expected exception + } + + // only 1 session should exist + Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertTrue(client.isConnected()); + + client.disconnect(); + client.close(); + } + @Test(timeout = DEFAULT_TIMEOUT) public void testQueueCleanedUpOnConsumerFail() throws Exception { final String topic = getName(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java index 6b191e0e34..70b0a34733 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java @@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase { return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence()); } + protected MqttClient createPahoClient(String clientId, int port) throws MqttException { + return new MqttClient(protocol + "://localhost:" + port, clientId, new MemoryPersistence()); + } + protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException { return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()); } @@ -333,12 +337,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase { if (protocolManager == null) { return Collections.emptyMap(); } else { - return protocolManager.getSessionStates(); + return protocolManager.getStateManager().getSessionStates(); } } public void scanSessions() { - getProtocolManager().scanSessions(); + getProtocolManager().getStateManager().scanSessions(); } public MQTTProtocolManager getProtocolManager() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java index c6b4c3b088..60d1302a98 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java @@ -76,7 +76,13 @@ public class MessageReceiptTests extends MQTT5TestSupport { for (int i = 0; i < CONSUMER_COUNT; i++) { producer.publish(TOPIC + i, ("hello" + i).getBytes(), 0, false); } - Wait.assertEquals((long) CONSUMER_COUNT, () -> server.getActiveMQServerControl().getTotalMessagesAdded(), 2000, 100); + Wait.assertEquals((long) CONSUMER_COUNT, () -> { + int totalMessagesAdded = 0; + for (int i = 0; i < CONSUMER_COUNT; i++) { + totalMessagesAdded += getSubscriptionQueue(TOPIC + i).getMessagesAdded(); + } + return totalMessagesAdded; + }, 2000, 100); producer.disconnect(); producer.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java index 34bc04e54f..64c84c4945 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java @@ -99,7 +99,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { @Test public void testJAASSecurityManagerAuthentication() throws Exception { - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); server.start(); try (Connection c = cf.createConnection("first", "secret")) { Thread.sleep(200); @@ -113,7 +113,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { final SimpleString ADDRESS = new SimpleString("address"); ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(); - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); Set roles = new HashSet<>(); roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false)); server.getConfiguration().putSecurityRoles("#", roles); @@ -163,7 +163,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { public void testJAASSecurityManagerAuthorizationPositive() throws Exception { final String ADDRESS = "address"; - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); Set roles = new HashSet<>(); roles.add(new Role("programmers", true, true, true, true, true, true, true, true, true, true)); server.getConfiguration().putSecurityRoles("#", roles); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java index cc22d6389b..3526c3cec8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java @@ -328,7 +328,7 @@ public class SecurityTest extends ActiveMQTestBase { params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass"); params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); - server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); + server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); // ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory HashSet roles = new HashSet<>(); @@ -374,7 +374,7 @@ public class SecurityTest extends ActiveMQTestBase { params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass"); params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); - server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); + server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); server.start(); ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616?verifyHostName=false");