diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java index 8ddd5f5871..54f44bd139 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java @@ -60,6 +60,11 @@ public class RandomUtil { return Math.abs(RandomUtil.randomInt()); } + public static Integer randomPositiveIntOrNull() { + Integer random = RandomUtil.randomInt(); + return random % 5 == 0 ? null : Math.abs(random); + } + public static ActiveMQBuffer randomBuffer(final int size, final long... data) { ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index a0c73d6979..8214e6d628 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1270,9 +1270,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public String toString() { try { final TypedProperties properties = getProperties(); - return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + - ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + - ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this); + return "CoreMessage[messageID=" + messageID + + ", durable=" + isDurable() + + ", userID=" + getUserID() + + ", priority=" + this.getPriority() + + ", timestamp=" + toDate(getTimestamp()) + + ", expiration=" + toDate(getExpiration()) + + ", durable=" + durable + + ", address=" + getAddress() + + ", size=" + getPersistentSize() + + ", properties=" + properties + + "]@" + System.identityHashCode(this); } catch (Throwable e) { logger.warn("Error creating String for message: ", e); return "ServerMessage[messageID=" + messageID + "]"; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java new file mode 100644 index 0000000000..0bbecb362e --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTBundle.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.mqtt; + +import org.apache.activemq.artemis.logs.BundleFactory; +import org.apache.activemq.artemis.logs.annotation.LogBundle; +import org.apache.activemq.artemis.logs.annotation.Message; + +/** + * Logger Code 85 + */ +@LogBundle(projectCode = "AMQ", regexID = "85[0-9]{4}") +public interface MQTTBundle { + + MQTTBundle BUNDLE = BundleFactory.newBundle(MQTTBundle.class); + + @Message(id = 850000, value = "Unable to store MQTT state within given timeout: {}ms") + IllegalStateException unableToStoreMqttState(long timeout); +} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 1cdcc149e4..eecc76166d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -67,7 +67,7 @@ public class MQTTConnectionManager { boolean cleanStart = connect.variableHeader().isCleanSession(); String clientId = session.getConnection().getClientID(); - boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId); + boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId); MQTTSessionState sessionState = getSessionState(clientId); synchronized (sessionState) { session.setSessionState(sessionState); @@ -120,6 +120,7 @@ public class MQTTConnectionManager { connackProperties = getConnackProperties(); } else { + sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval()); connackProperties = MqttProperties.NO_PROPERTIES; } @@ -193,15 +194,15 @@ public class MQTTConnectionManager { * ensure that the connection for the client ID matches *this* connection otherwise we could remove the * entry for the client who "stole" this client ID via [MQTT-3.1.4-2] */ - if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) { - session.getProtocolManager().removeConnectedClient(clientId); + if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) { + session.getStateManager().removeConnectedClient(clientId); } } } } } - private synchronized MQTTSessionState getSessionState(String clientId) { - return session.getProtocolManager().getSessionState(clientId); + private synchronized MQTTSessionState getSessionState(String clientId) throws Exception { + return session.getStateManager().getSessionState(clientId); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java index 6611482b41..834dadba2c 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java @@ -58,4 +58,7 @@ public interface MQTTLogger { @LogMessage(id = 834007, value = "Authorization failure sending will message: {}", level = LogMessage.Level.ERROR) void authorizationFailureSendingWillMessage(String message); + + @LogMessage(id = 834008, value = "Failed to remove session state for client with ID: {}", level = LogMessage.Level.ERROR) + void failedToRemoveSessionState(String clientID, Exception e); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 8ea0a5db66..3c5c139a0d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -44,8 +44,8 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException; +import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; @@ -57,6 +57,7 @@ import java.lang.invoke.MethodHandles; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the @@ -257,7 +258,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) { return; } else { - protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection()); + protocolManager.getStateManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection()); } if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) { @@ -377,7 +378,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleSubscribe(MqttSubscribeMessage message) throws Exception { - int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), message.idAndPropertiesVariableHeader().properties()); + Integer subscriptionIdentifier = MQTTUtil.getProperty(Integer.class, message.idAndPropertiesVariableHeader().properties(), SUBSCRIPTION_IDENTIFIER, null); + int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), subscriptionIdentifier); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(message.variableHeader().messageId(), MqttProperties.NO_PROPERTIES); MqttSubAckMessage subAck = new MqttSubAckMessage(header, variableHeader, new MqttSubAckPayload(qos)); @@ -385,7 +387,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { - short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); + short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics(), true); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage unsubAck; if (session.getVersion() == MQTTVersion.MQTT_5) { @@ -462,14 +464,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { * * However, this behavior is configurable via the "allowLinkStealing" acceptor URL property. */ - private LinkStealingResult handleLinkStealing() { + private LinkStealingResult handleLinkStealing() throws Exception { final String clientID = session.getConnection().getClientID(); LinkStealingResult result; - if (protocolManager.isClientConnected(clientID)) { - MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID); + if (protocolManager.getStateManager().isClientConnected(clientID)) { + MQTTConnection existingConnection = protocolManager.getStateManager().getConnectedClient(clientID); if (protocolManager.isAllowLinkStealing()) { - MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession(); + MQTTSession existingSession = protocolManager.getStateManager().getSessionState(clientID).getSession(); if (existingSession != null) { if (existingSession.getVersion() == MQTTVersion.MQTT_5) { existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index fef99a75a0..8d5d2d9e28 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,12 +16,10 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -36,6 +34,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; @@ -47,7 +46,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; public class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { @@ -60,9 +58,6 @@ public class MQTTProtocolManager extends AbstractProtocolManager incomingInterceptors = new ArrayList<>(); private final List outgoingInterceptors = new ArrayList<>(); - private final Map connectedClients = new ConcurrentHashMap<>(); - private final Map sessionStates = new ConcurrentHashMap<>(); - private int defaultMqttSessionExpiryInterval = -1; private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX; @@ -79,13 +74,23 @@ public class MQTTProtocolManager extends AbstractProtocolManager incomingInterceptors, - List outgoingInterceptors) { + List outgoingInterceptors) throws Exception { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); server.getManagementService().addNotificationListener(this); routingHandler = new MQTTRoutingHandler(server); + sessionStateManager = MQTTStateManager.getInstance(server); + server.registerActivateCallback(new CleaningActivateCallback() { + @Override + public void deActivate() { + MQTTStateManager.removeInstance(server); + sessionStateManager = null; + } + }); } public int getDefaultMqttSessionExpiryInterval() { @@ -176,7 +181,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager toRemove = new ArrayList(); - for (Map.Entry entry : sessionStates.entrySet()) { - MQTTSessionState state = entry.getValue(); - logger.debug("Inspecting session: {}", state); - int sessionExpiryInterval = getSessionExpiryInterval(state); - if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { - toRemove.add(entry.getKey()); - } - if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { - state.getSession().sendWillMessage(); - } - } - - for (String key : toRemove) { - logger.debug("Removing state for session: {}", key); - MQTTSessionState state = removeSessionState(key); - if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { - state.getSession().sendWillMessage(); - } - } - } - - private int getSessionExpiryInterval(MQTTSessionState state) { - int sessionExpiryInterval; - if (state.getClientSessionExpiryInterval() == 0) { - sessionExpiryInterval = getDefaultMqttSessionExpiryInterval(); - } else { - sessionExpiryInterval = state.getClientSessionExpiryInterval(); - } - return sessionExpiryInterval; - } - @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { try { @@ -348,56 +320,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager getSessionStates() { - return new HashMap<>(sessionStates); - } - - /** For DEBUG only */ - public Map getConnectedClients() { - return connectedClients; + public MQTTStateManager getStateManager() { + return sessionStateManager; } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 253ad66995..2264dbf3d5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -84,7 +84,7 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory { if (m instanceof MQTTProtocolManager) { - ((MQTTProtocolManager)m).scanSessions(); + ((MQTTProtocolManager)m).getStateManager().scanSessions(); } }); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index f6fa85c5de..ba70c45ccb 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -64,6 +64,8 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; + private MQTTStateManager stateManager; + private boolean clean; private WildcardConfiguration wildcardConfiguration; @@ -80,6 +82,7 @@ public class MQTTSession { WildcardConfiguration wildcardConfiguration) throws Exception { this.protocolHandler = protocolHandler; this.protocolManager = protocolManager; + this.stateManager = protocolManager.getStateManager(); this.wildcardConfiguration = wildcardConfiguration; this.connection = connection; @@ -87,7 +90,7 @@ public class MQTTSession { mqttConnectionManager = new MQTTConnectionManager(this); mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure()); sessionCallback = new MQTTSessionCallback(this, connection); - subscriptionManager = new MQTTSubscriptionManager(this); + subscriptionManager = new MQTTSubscriptionManager(this, stateManager); retainMessageManager = new MQTTRetainMessageManager(this); state = MQTTSessionState.DEFAULT; @@ -120,11 +123,9 @@ public class MQTTSession { internalServerSession.close(false); } - if (state != null) { - state.setAttached(false); - state.setDisconnectedTime(System.currentTimeMillis()); - state.clearTopicAliases(); - } + state.setAttached(false); + state.setDisconnectedTime(System.currentTimeMillis()); + state.clearTopicAliases(); if (getVersion() == MQTTVersion.MQTT_5) { if (state.getClientSessionExpiryInterval() == 0) { @@ -133,9 +134,7 @@ public class MQTTSession { sendWillMessage(); } clean(false); - protocolManager.removeSessionState(connection.getClientID()); - } else { - state.setDisconnectedTime(System.currentTimeMillis()); + stateManager.removeSessionState(connection.getClientID()); } } else { if (state.isWill() && failure) { @@ -143,7 +142,7 @@ public class MQTTSession { } if (isClean()) { clean(false); - protocolManager.removeSessionState(connection.getClientID()); + stateManager.removeSessionState(connection.getClientID()); } } } @@ -226,6 +225,10 @@ public class MQTTSession { return protocolManager; } + MQTTStateManager getStateManager() { + return stateManager; + } + void clean(boolean enforceSecurity) throws Exception { subscriptionManager.clean(enforceSecurity); mqttPublishManager.clean(); @@ -290,6 +293,9 @@ public class MQTTSession { @Override public String toString() { - return "MQTTSession[coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") + "]"; + return "MQTTSession[" + + "coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") + + ", clientId: " + state.getClientId() + + "]"; } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 33e6f159ed..5c63f1c152 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -30,9 +30,14 @@ import java.util.regex.Pattern; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.settings.impl.Match; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +47,13 @@ public class MQTTSessionState { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final MQTTSessionState DEFAULT = new MQTTSessionState(null); + public static final MQTTSessionState DEFAULT = new MQTTSessionState((String) null, null); private MQTTSession session; - private String clientId; + private final String clientId; + + private final MQTTStateManager stateManager; private final ConcurrentMap> subscriptions = new ConcurrentHashMap<>(); @@ -91,8 +98,50 @@ public class MQTTSessionState { private Map serverTopicAliases; - public MQTTSessionState(String clientId) { + public MQTTSessionState(String clientId, MQTTStateManager stateManager) { this.clientId = clientId; + this.stateManager = stateManager; + } + + /** + * This constructor deserializes session data from a message. The format is as follows. + * + * - byte: version + * - int: subscription count + * + * There may be 0 or more subscriptions. The subscription format is as follows. + * + * - String: topic name + * - int: QoS + * - boolean: no-local + * - boolean: retain as published + * - int: retain handling + * - int (nullable): subscription identifier + * + * @param message the message holding the MQTT session data + * @param stateManager the manager used to add and remove sessions from storage + */ + public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager) { + logger.debug("Deserializing MQTT session state from {}", message); + this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME); + this.stateManager = stateManager; + ActiveMQBuffer buf = message.getDataBuffer(); + + // no need to use the version at this point + byte version = buf.readByte(); + + int subscriptionCount = buf.readInt(); + logger.debug("Deserializing {} subscriptions", subscriptionCount); + for (int i = 0; i < subscriptionCount; i++) { + String topicName = buf.readString(); + MqttQoS qos = MqttQoS.valueOf(buf.readInt()); + boolean nolocal = buf.readBoolean(); + boolean retainAsPublished = buf.readBoolean(); + MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy = MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(buf.readInt()); + Integer subscriptionId = buf.readNullableInt(); + + subscriptions.put(topicName, new Pair<>(new MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal, retainAsPublished, retainedHandlingPolicy)), subscriptionId)); + } } public MQTTSession getSession() { @@ -103,7 +152,7 @@ public class MQTTSessionState { this.session = session; } - public synchronized void clear() { + public synchronized void clear() throws Exception { subscriptions.clear(); messageRefStore.clear(); addressMessageMap.clear(); @@ -148,7 +197,11 @@ public class MQTTSessionState { return result; } - public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) { + public Collection> getSubscriptionsPlusID() { + return subscriptions.values(); + } + + public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) throws Exception { // synchronized to prevent race with removeSubscription synchronized (subscriptions) { addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCoreAddress(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<>()); @@ -172,7 +225,7 @@ public class MQTTSessionState { } } - public void removeSubscription(String address) { + public void removeSubscription(String address) throws Exception { // synchronized to prevent race with addSubscription synchronized (subscriptions) { subscriptions.remove(address); @@ -184,6 +237,10 @@ public class MQTTSessionState { return subscriptions.get(address) != null ? subscriptions.get(address).getA() : null; } + public Pair getSubscriptionPlusID(String address) { + return subscriptions.get(address) != null ? subscriptions.get(address) : null; + } + public List getMatchingSubscriptionIdentifiers(String address) { address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, session.getServer().getConfiguration().getWildcardConfiguration()); List result = null; @@ -207,10 +264,6 @@ public class MQTTSessionState { return clientId; } - public void setClientId(String clientId) { - this.clientId = clientId; - } - public long getDisconnectedTime() { return disconnectedTime; } @@ -372,6 +425,29 @@ public class MQTTSessionState { } } + @Override + public String toString() { + return "MQTTSessionState[session=" + session + + ", clientId=" + clientId + + ", subscriptions=" + subscriptions + + ", messageRefStore=" + messageRefStore + + ", addressMessageMap=" + addressMessageMap + + ", pubRec=" + pubRec + + ", attached=" + attached + + ", outboundStore=" + outboundStore + + ", disconnectedTime=" + disconnectedTime + + ", sessionExpiryInterval=" + clientSessionExpiryInterval + + ", isWill=" + isWill + + ", willMessage=" + willMessage + + ", willTopic=" + willTopic + + ", willQoSLevel=" + willQoSLevel + + ", willRetain=" + willRetain + + ", willDelayInterval=" + willDelayInterval + + ", failed=" + failed + + ", maxPacketSize=" + clientMaxPacketSize + + "]@" + System.identityHashCode(this); + } + public class OutboundStore { private HashMap, Integer> artemisToMqttMessageMap = new HashMap<>(); @@ -445,11 +521,6 @@ public class MQTTSessionState { } } - @Override - public String toString() { - return "MQTTSessionState[" + "session=" + session + ", clientId='" + clientId + "', subscriptions=" + subscriptions + ", messageRefStore=" + messageRefStore + ", addressMessageMap=" + addressMessageMap + ", pubRec=" + pubRec + ", attached=" + attached + ", outboundStore=" + outboundStore + ", disconnectedTime=" + disconnectedTime + ", sessionExpiryInterval=" + clientSessionExpiryInterval + ", isWill=" + isWill + ", willMessage=" + willMessage + ", willTopic='" + willTopic + "', willQoSLevel=" + willQoSLevel + ", willRetain=" + willRetain + ", willDelayInterval=" + willDelayInterval + ", failed=" + failed + ", maxPacketSize=" + clientMaxPacketSize + ']'; - } - public enum WillStatus { NOT_SENT, SENT, SENDING; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java new file mode 100644 index 0000000000..aa913b8f3a --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTStateManager { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ActiveMQServer server; + private final Map sessionStates = new ConcurrentHashMap<>(); + private final Queue sessionStore; + private static Map INSTANCES = new HashMap<>(); + private final Map connectedClients = new ConcurrentHashMap<>(); + + /* + * Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want + * one instance of MQTTSessionStateManager per-broker with the understanding that there can be multiple brokers in + * the same JVM. + */ + public static synchronized MQTTStateManager getInstance(ActiveMQServer server) throws Exception { + MQTTStateManager instance = INSTANCES.get(System.identityHashCode(server)); + if (instance == null) { + instance = new MQTTStateManager(server); + INSTANCES.put(System.identityHashCode(server), instance); + } + + return instance; + } + + public static synchronized void removeInstance(ActiveMQServer server) { + INSTANCES.remove(System.identityHashCode(server)); + } + + private MQTTStateManager(ActiveMQServer server) throws Exception { + this.server = server; + sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true); + + // load session data from queue + try (LinkedListIterator iterator = sessionStore.browserIterator()) { + try { + while (iterator.hasNext()) { + MessageReference ref = iterator.next(); + String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME); + MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this); + sessionStates.put(clientId, sessionState); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing + } + } + } + + public void scanSessions() { + List toRemove = new ArrayList(); + for (Map.Entry entry : sessionStates.entrySet()) { + MQTTSessionState state = entry.getValue(); + logger.debug("Inspecting session: {}", state); + int sessionExpiryInterval = state.getClientSessionExpiryInterval(); + if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { + toRemove.add(entry.getKey()); + } + if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { + state.getSession().sendWillMessage(); + } + } + + for (String key : toRemove) { + try { + MQTTSessionState state = removeSessionState(key); + if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { + state.getSession().sendWillMessage(); + } + } catch (Exception e) { + MQTTLogger.LOGGER.failedToRemoveSessionState(key, e); + } + } + } + + public MQTTSessionState getSessionState(String clientId) throws Exception { + /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ + if (sessionStates.containsKey(clientId)) { + return sessionStates.get(clientId); + } else { + MQTTSessionState sessionState = new MQTTSessionState(clientId, this); + logger.debug("Adding MQTT session state for: {}", clientId); + sessionStates.put(clientId, sessionState); + storeSessionState(sessionState); + return sessionState; + } + } + + public MQTTSessionState removeSessionState(String clientId) throws Exception { + logger.debug("Removing MQTT session state for: {}", clientId); + if (clientId == null) { + return null; + } + removeDurableSessionState(clientId); + return sessionStates.remove(clientId); + } + + public void removeDurableSessionState(String clientId) throws Exception { + int deletedCount = sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" = '").append(clientId).append("'").toString())); + logger.debug("Removed {} durable MQTT state records for: {}", deletedCount, clientId); + } + + public Map getSessionStates() { + return new HashMap<>(sessionStates); + } + + @Override + public String toString() { + return "MQTTSessionStateManager@" + Integer.toHexString(System.identityHashCode(this)); + } + + public void storeSessionState(MQTTSessionState state) throws Exception { + logger.debug("Adding durable MQTT state record for: {}", state.getClientId()); + + /* + * It is imperative to ensure the routed message is actually *all the way* on the queue before proceeding + * otherwise there can be a race with removing it. + */ + CountDownLatch latch = new CountDownLatch(1); + Transaction tx = new TransactionImpl(server.getStorageManager()); + server.getPostOffice().route(serializeState(state, server.getStorageManager().generateID()), tx, false); + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + latch.countDown(); + } + }); + tx.commit(); + final long timeout = 5000; + if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { + throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout); + } + } + + public static CoreMessage serializeState(MQTTSessionState state, long messageID) { + CoreMessage message = new CoreMessage().initBuffer(50).setMessageID(messageID); + message.setAddress(MQTTUtil.MQTT_SESSION_STORE); + message.setDurable(true); + message.putStringProperty(Message.HDR_LAST_VALUE_NAME, state.getClientId()); + Collection> subscriptions = state.getSubscriptionsPlusID(); + ActiveMQBuffer buf = message.getBodyBuffer(); + + /* + * This byte represents the "version". If the payload changes at any point in the future then we can detect that + * and adjust so that when users are upgrading we can still read the old data format. + */ + buf.writeByte((byte) 0); + + buf.writeInt(subscriptions.size()); + logger.debug("Serializing {} subscriptions", subscriptions.size()); + for (Pair pair : subscriptions) { + MqttTopicSubscription sub = pair.getA(); + buf.writeString(sub.topicName()); + buf.writeInt(sub.option().qos().value()); + buf.writeBoolean(sub.option().isNoLocal()); + buf.writeBoolean(sub.option().isRetainAsPublished()); + buf.writeInt(sub.option().retainHandling().value()); + buf.writeNullableInt(pair.getB()); + } + + return message; + } + + public boolean isClientConnected(String clientId, MQTTConnection connection) { + MQTTConnection connectedConn = connectedClients.get(clientId); + + if (connectedConn != null) { + return connectedConn.equals(connection); + } + + return false; + } + + public boolean isClientConnected(String clientId) { + return connectedClients.containsKey(clientId); + } + + public void removeConnectedClient(String clientId) { + connectedClients.remove(clientId); + } + + /** + * @param clientId + * @param connection + * @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for + * the {@code clientId} + */ + public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { + return connectedClients.put(clientId, connection); + } + + public MQTTConnection getConnectedClient(String clientId) { + return connectedClients.get(clientId); + } + + /** For DEBUG only */ + public Map getConnectedClients() { + return connectedClients; + } +} \ No newline at end of file diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 176a1c18ca..dedacd55e9 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -38,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; -import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH; import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING; @@ -47,6 +46,8 @@ public class MQTTSubscriptionManager { private final MQTTSession session; + private final MQTTStateManager stateManager; + private final ConcurrentMap consumerQoSLevels; private final ConcurrentMap consumers; @@ -66,8 +67,9 @@ public class MQTTSubscriptionManager { private final char anyWords; - public MQTTSubscriptionManager(MQTTSession session) { + public MQTTSubscriptionManager(MQTTSession session, MQTTStateManager stateManager) { this.session = session; + this.stateManager = stateManager; singleWord = session.getServer().getConfiguration().getWildcardConfiguration().getSingleWord(); anyWords = session.getServer().getConfiguration().getWildcardConfiguration().getAnyWords(); @@ -130,7 +132,7 @@ public class MQTTSubscriptionManager { session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier); } } catch (Exception e) { - // if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue is removed + // if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue q.deleteQueue(); throw e; } @@ -246,56 +248,52 @@ public class MQTTSubscriptionManager { consumerQoSLevels.put(cid, qos); } - short[] removeSubscriptions(List topics) throws Exception { + short[] removeSubscriptions(List topics, boolean enforceSecurity) throws Exception { short[] reasonCodes; + MQTTSessionState state = session.getState(); - synchronized (session.getState()) { + synchronized (state) { reasonCodes = new short[topics.size()]; for (int i = 0; i < topics.size(); i++) { - reasonCodes[i] = removeSubscription(topics.get(i)); + if (session.getState().getSubscription(topics.get(i)) == null) { + reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED; + continue; + } + + short reasonCode = MQTTReasonCodes.SUCCESS; + + try { + session.getState().removeSubscription(topics.get(i)); + ServerConsumer removed = consumers.remove(parseTopicName(topics.get(i))); + if (removed != null) { + removed.close(false); + consumerQoSLevels.remove(removed.getID()); + } + + SimpleString internalQueueName = getQueueNameForTopic(topics.get(i)); + Queue queue = session.getServer().locateQueue(internalQueueName); + if (queue != null) { + if (queue.isConfigurationManaged()) { + queue.deleteAllReferences(); + } else if (!topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) { + session.getServerSession().deleteQueue(internalQueueName, enforceSecurity); + } + } + } catch (Exception e) { + MQTTLogger.LOGGER.errorRemovingSubscription(e); + reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR; + } + + reasonCodes[i] = reasonCode; } + + // store state after *all* requested subscriptions have been removed in memory + stateManager.storeSessionState(state); } return reasonCodes; } - private short removeSubscription(String address) { - return removeSubscription(address, true); - } - - private short removeSubscription(String topic, boolean enforceSecurity) { - if (session.getState().getSubscription(topic) == null) { - return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED; - } - - short reasonCode = MQTTReasonCodes.SUCCESS; - - try { - session.getState().removeSubscription(topic); - - ServerConsumer removed = consumers.remove(parseTopicName(topic)); - if (removed != null) { - removed.close(false); - consumerQoSLevels.remove(removed.getID()); - } - - SimpleString internalQueueName = getQueueNameForTopic(topic); - Queue queue = session.getServer().locateQueue(internalQueueName); - if (queue != null) { - if (queue.isConfigurationManaged()) { - queue.deleteAllReferences(); - } else if (!topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) { - session.getServerSession().deleteQueue(internalQueueName, enforceSecurity); - } - } - } catch (Exception e) { - MQTTLogger.LOGGER.errorRemovingSubscription(e); - reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR; - } - - return reasonCode; - } - private SimpleString getQueueNameForTopic(String topic) { if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { int slashIndex = topic.indexOf(SLASH) + 1; @@ -308,19 +306,15 @@ public class MQTTSubscriptionManager { } /** - * As per MQTT Spec. Subscribes this client to a number of MQTT topics. + * As per MQTT Spec. Subscribes this client to a number of MQTT topics. * * @param subscriptions * @return An array of integers representing the list of accepted QoS for each topic. * @throws Exception */ - int[] addSubscriptions(List subscriptions, MqttProperties properties) throws Exception { - synchronized (session.getState()) { - Integer subscriptionIdentifier = null; - if (properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()) != null) { - subscriptionIdentifier = (Integer) properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()).value(); - } - + int[] addSubscriptions(List subscriptions, Integer subscriptionIdentifier) throws Exception { + MQTTSessionState state = session.getState(); + synchronized (state) { int[] qos = new int[subscriptions.size()]; for (int i = 0; i < subscriptions.size(); i++) { @@ -354,6 +348,10 @@ public class MQTTSubscriptionManager { } } } + + // store state after *all* requested subscriptions have been created in memory + stateManager.storeSessionState(state); + return qos; } } @@ -362,9 +360,11 @@ public class MQTTSubscriptionManager { return consumerQoSLevels; } - void clean(boolean enforceSecurity) { + void clean(boolean enforceSecurity) throws Exception { + List topics = new ArrayList<>(); for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) { - removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity); + topics.add(mqttTopicSubscription.topicName()); } + removeSubscriptions(topics, enforceSecurity); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 1f77de5c1b..4b59ea2489 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -86,6 +86,8 @@ public class MQTTUtil { public static final char SLASH = '/'; + public static final String MQTT_SESSION_STORE = DOLLAR + "sys.mqtt.sessions"; + public static final String MQTT_RETAIN_ADDRESS_PREFIX = DOLLAR + "sys.mqtt.retain."; public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level"); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java new file mode 100644 index 0000000000..11d2f025bf --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/StateSerDeTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttSubscriptionOption; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StateSerDeTest { + + @Test(timeout = 30000) + public void testSerDe() throws Exception { + for (int i = 0; i < 500; i++) { + String clientId = RandomUtil.randomString(); + MQTTSessionState unserialized = new MQTTSessionState(clientId, null); + Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull(); + for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) { + MqttTopicSubscription sub = new MqttTopicSubscription(RandomUtil.randomString(), + new MqttSubscriptionOption(MqttQoS.valueOf(RandomUtil.randomInterval(0, 3)), + RandomUtil.randomBoolean(), + RandomUtil.randomBoolean(), + MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(RandomUtil.randomInterval(0, 3)))); + unserialized.addSubscription(sub, MQTTUtil.MQTT_WILDCARD, subscriptionIdentifier); + } + + CoreMessage serializedState = MQTTStateManager.serializeState(unserialized, 0); + MQTTSessionState deserialized = new MQTTSessionState(serializedState, null); + + assertEquals(unserialized.getClientId(), deserialized.getClientId()); + for (Pair unserializedEntry : unserialized.getSubscriptionsPlusID()) { + MqttTopicSubscription unserializedSub = unserializedEntry.getA(); + Integer unserializedSubId = unserializedEntry.getB(); + Pair deserializedEntry = deserialized.getSubscriptionPlusID(unserializedSub.topicName()); + MqttTopicSubscription deserializedSub = deserializedEntry.getA(); + Integer deserializedSubId = deserializedEntry.getB(); + + assertTrue(compareSubs(unserializedSub, deserializedSub)); + assertEquals(unserializedSubId, deserializedSubId); + } + } + } + + private boolean compareSubs(MqttTopicSubscription a, MqttTopicSubscription b) { + if (a == b) { + return true; + } + if (a == null || b == null) { + return false; + } + if (a.topicName() == null) { + if (b.topicName() != null) { + return false; + } + } else if (!a.topicName().equals(b.topicName())) { + return false; + } + if (a.option() == null) { + if (b.option() != null) { + return false; + } + } else { + if (a.option().qos() == null) { + if (b.option().qos() != null) { + return false; + } + } else if (a.option().qos().value() != b.option().qos().value()) { + return false; + } + if (a.option().retainHandling() == null) { + if (b.option().retainHandling() != null) { + return false; + } + } else if (a.option().retainHandling().value() != b.option().retainHandling().value()) { + return false; + } + if (a.option().isRetainAsPublished() != b.option().isRetainAsPublished()) { + return false; + } + if (a.option().isNoLocal() != b.option().isNoLocal()) { + return false; + } + } + + return true; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java index 883b116ec7..09592b43f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java @@ -40,4 +40,9 @@ public class InVMAcceptorFactory implements AcceptorFactory { final Map protocolMap) { return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, protocolMap, threadPool); } + + @Override + public boolean supportsRemote() { + return false; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 9f948355d9..eeacf86c8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2226,6 +2226,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Transaction tx = new TransactionImpl(storageManager); synchronized (this) { + // ensure all messages are moved from intermediateMessageReferences so that they can be seen by the iterator + doInternalPoll(); try (LinkedListIterator iter = iterator()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java index 4390d4f0c5..2416e17efc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java @@ -51,4 +51,8 @@ public interface AcceptorFactory { ScheduledExecutorService scheduledThreadPool, Map protocolMap); + default boolean supportsRemote() { + return true; + } + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 6e32ca69e1..2e13383dc5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -585,6 +585,9 @@ public abstract class ActiveMQTestBase extends Assert { if (netty) { configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap(), "netty", new HashMap())); + } else { + // if we're in-vm it's a waste to resolve protocols since they'll never be used + configuration.setResolveProtocols(false); } return configuration; diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc index 106af544e8..d460279bd8 100644 --- a/docs/user-manual/mqtt.adoc +++ b/docs/user-manual/mqtt.adoc @@ -89,6 +89,12 @@ As far as the broker is concerned a payload is just an array of bytes. However, to facilitate logging the broker will encode the payloads as UTF-8 strings and print them up to 256 characters. Payload logging is limited to avoid filling the logs with potentially hundreds of megabytes of unhelpful information. +== Persistent Subscriptions + +The subscription information for MQTT sessions is stored in an internal queue named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is enabled). +The information is durable so that MQTT subscribers can reconnect and resume their subscriptions seamlessly after a broker restart, failure, etc. +When brokers are configured for high availability this information will be available on the backup so even in the case of a broker fail-over subscribers will be able to resume their subscriptions. + == Custom Client ID Handling The client ID used by an MQTT application is very important as it uniquely identifies the application. diff --git a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml index a6fcc0b7d7..020338df2b 100644 --- a/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/divert/src/main/resources/activemq/server0/broker.xml @@ -45,7 +45,7 @@ under the License. - + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java index c135dc5089..bf0580496b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java @@ -28,6 +28,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; import org.apache.qpid.jms.JmsConnectionFactory; @@ -58,6 +60,21 @@ public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase { this.protocol = protocol; } + @Override + protected void createConfigs() throws Exception { + nodeManager = createNodeManager(); + TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + + backupConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName())); + + backupServer = createTestableServer(backupConfig); + + liveConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); + + liveServer = createTestableServer(liveConfig); + } + @Override protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { return getNettyAcceptorTransportConfig(live); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java index 53263d3772..feed08107d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java @@ -59,8 +59,8 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport { super.setUp(); createReplicatedConfigs(); - liveConfig.addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); - backupConfig.addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); + liveConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); + backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512"); liveServer.start(); backupServer.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java index 4e75fa3df9..fb3be03d54 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageExpirationTest.java @@ -288,6 +288,7 @@ public class MessageExpirationTest extends ActiveMQTestBase { server = createServer(true); server.getConfiguration().addAcceptorConfiguration("amqp", "tcp://127.0.0.1:61616"); + server.getConfiguration().setResolveProtocols(true); server.getConfiguration().setMessageExpiryScanPeriod(200); server.start(); locator = createInVMNonHALocator(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java index 657676dc51..558023a64d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java @@ -40,9 +40,9 @@ public class UpdateQueueTest extends ActiveMQTestBase { @Test public void testUpdateQueueWithNullUser() throws Exception { - ActiveMQServer server = createServer(true, true); + ActiveMQServer server = createServer(true, false); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0"); server.start(); @@ -80,7 +80,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { Assert.assertEquals("newUser", user, queue.getUser()); - factory = new ActiveMQConnectionFactory(); + factory = new ActiveMQConnectionFactory("vm://0"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -110,9 +110,9 @@ public class UpdateQueueTest extends ActiveMQTestBase { @Test public void testUpdateQueue() throws Exception { - ActiveMQServer server = createServer(true, true); + ActiveMQServer server = createServer(true, false); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0"); server.start(); @@ -169,7 +169,7 @@ public class UpdateQueueTest extends ActiveMQTestBase { Assert.assertEquals("newUser", queue.getUser().toString()); Assert.assertEquals(180L, queue.getRingSize()); - factory = new ActiveMQConnectionFactory(); + factory = new ActiveMQConnectionFactory("vm://0"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java index 03bed28ffc..72b6474bb5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MessageJournalTest.java @@ -86,7 +86,7 @@ public class MessageJournalTest extends ActiveMQTestBase { @Test public void testStoreAMQP() throws Throwable { - ActiveMQServer server = createServer(true); + ActiveMQServer server = createServer(true, true); server.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java index a2c7bde455..7fda8a1b2b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTFQQNTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.junit.Test; @@ -34,8 +35,9 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - assertEquals(1, server.getPostOffice().getAllBindings().count()); - Binding b = server.getPostOffice().getAllBindings().iterator().next(); + Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah")); + assertEquals(1, bindings.size()); + Binding b = bindings.getBindings().iterator().next(); //check that query using bare queue name works as before QueueQueryResult result = server.queueQuery(b.getUniqueName()); assertTrue(result.isExists()); @@ -126,8 +128,9 @@ public class MQTTFQQNTest extends MQTTTestSupport { try { subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); - assertEquals(1, server.getPostOffice().getAllBindings().count()); - Binding b = server.getPostOffice().getAllBindings().iterator().next(); + Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah")); + assertEquals(1, bindings.size()); + Binding b = bindings.getBindings().iterator().next(); //check ::queue QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java index f55876101b..802af06186 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java @@ -100,7 +100,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates(); + sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } assertEquals(1, sessionStates.size()); @@ -132,7 +132,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates(); + sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } assertEquals(1, sessionStates.size()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java index e332da72a7..1a5e673a01 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java @@ -2099,10 +2099,10 @@ public class MQTTTest extends MQTTTestSupport { final int port2 = 1885; final Configuration cfg1 = createDefaultConfig(1, false); - cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT"); + cfg1.setResolveProtocols(true).addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT"); final Configuration cfg2 = createDefaultConfig(2, false); - cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT"); + cfg2.setResolveProtocols(true).addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT"); final ActiveMQServer server1 = createServer(cfg1); server1.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java index 9f8bcd0d0a..491c065fe3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java @@ -387,7 +387,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { if (acceptor instanceof AbstractAcceptor) { ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); if (protocolManager instanceof MQTTProtocolManager) { - return ((MQTTProtocolManager) protocolManager).getSessionStates(); + return ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java index 4f7d35b1d8..04eaf7a88f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.java @@ -68,14 +68,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; subConnection2.subscribe(topics); @@ -258,14 +258,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; @@ -456,14 +456,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); //Waiting for the first sub connection be closed assertTrue(waitConnectionClosed(subConnection1)); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); subConnection1 = null; subConnection2.subscribe(topics); @@ -622,7 +622,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -711,9 +711,9 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); - Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size); // Subscribe to topics connection1.subscribe(topics); @@ -924,7 +924,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { Thread.sleep(1000); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); @@ -1029,7 +1029,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); - Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size); + Wait.assertEquals(2, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size); subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); //Waiting for the first sub connection be closed diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java index 32de7c147c..8a6341a8b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -221,7 +222,14 @@ public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport { messageConsumer.close(); messageConsumerAllNews.close(); - int countOfPageStores = server.getPagingManager().getStoreNames().length; + int countOfPageStores = 0; + SimpleString[] storeNames = server.getPagingManager().getStoreNames(); + for (int i = 0; i < storeNames.length; i++) { + if (!storeNames[i].equals(SimpleString.toSimpleString(MQTTUtil.MQTT_SESSION_STORE))) { + countOfPageStores++; + } + } + assertEquals("there should be 5", 5, countOfPageStores); connection.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 00e3a95a4b..e806d2a9e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -45,6 +45,7 @@ import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.UserProperty; import org.junit.Test; @@ -82,6 +83,69 @@ public class MQTT5Test extends MQTT5TestSupport { context.close(); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testResumeSubscriptionsAfterRestart() throws Exception { + final int SUBSCRIPTION_COUNT = 100; + List topicNames = new ArrayList<>(SUBSCRIPTION_COUNT); + for (int i = 0; i < SUBSCRIPTION_COUNT; i++) { + topicNames.add(getName() + i); + } + + CountDownLatch latch = new CountDownLatch(SUBSCRIPTION_COUNT); + MqttClient consumer = createPahoClient("myConsumerID"); + MqttConnectionOptions consumerOptions = new MqttConnectionOptionsBuilder() + .cleanStart(false) + .sessionExpiryInterval(999L) + .build(); + consumer.connect(consumerOptions); + List subs = new ArrayList<>(SUBSCRIPTION_COUNT); + for (String subName : topicNames) { + subs.add(new MqttSubscription(subName, 1)); + } + consumer.subscribe(subs.toArray(new MqttSubscription[0])); + consumer.disconnect(); + + MqttClient producer = createPahoClient("myProducerID"); + MqttConnectionOptions producerOptions = new MqttConnectionOptionsBuilder() + .sessionExpiryInterval(0L) + .build(); + producer.connect(producerOptions); + for (String subName : topicNames) { + producer.publish(subName, new byte[0], 1, false); + } + producer.disconnect(); + producer.close(); + + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100); + + server.stop(); + server.start(); + + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100); + Wait.assertTrue(() -> getSessionStates().get("myConsumerID") != null, 2000, 100); + consumer.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + if (topicNames.remove(topic)) { + latch.countDown(); + } + } + }); + consumerOptions = new MqttConnectionOptionsBuilder() + .cleanStart(false) + .sessionExpiryInterval(0L) + .build(); + consumer.connect(consumerOptions); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + consumer.unsubscribe(topicNames.toArray(new String[0])); + consumer.disconnect(); + consumer.close(); + Wait.assertEquals(0L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 5000, 100); + } + + /* + * Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184 + */ @Test(timeout = DEFAULT_TIMEOUT) public void testAddressAutoCreation() throws Exception { final String DESTINATION = RandomUtil.randomString(); @@ -470,6 +534,60 @@ public class MQTT5Test extends MQTT5TestSupport { client.close(); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingOnMultipleAcceptors() throws Exception { + int secondaryPort = 1884; + final String CLIENT_ID = RandomUtil.randomString(); + + server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort); + server.getRemotingService().startAcceptors(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + + MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort); + client2.connect(); + + // only 1 session should exist + Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertFalse(client.isConnected()); + + client.close(); + client2.disconnect(); + client2.close(); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingDisabledOnMultipleAcceptors() throws Exception { + int secondaryPort = 1884; + final String CLIENT_ID = RandomUtil.randomString(); + + server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort + "?allowLinkStealing=false"); + server.getRemotingService().startAcceptors(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + + MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort); + try { + client2.connect(); + fail("Should have thrown an exception on connect due to disabled link stealing"); + } catch (Exception e) { + // ignore expected exception + } + + // only 1 session should exist + Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertTrue(client.isConnected()); + + client.disconnect(); + client.close(); + } + @Test(timeout = DEFAULT_TIMEOUT) public void testQueueCleanedUpOnConsumerFail() throws Exception { final String topic = getName(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java index 6b191e0e34..70b0a34733 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java @@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase { return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence()); } + protected MqttClient createPahoClient(String clientId, int port) throws MqttException { + return new MqttClient(protocol + "://localhost:" + port, clientId, new MemoryPersistence()); + } + protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException { return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()); } @@ -333,12 +337,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase { if (protocolManager == null) { return Collections.emptyMap(); } else { - return protocolManager.getSessionStates(); + return protocolManager.getStateManager().getSessionStates(); } } public void scanSessions() { - getProtocolManager().scanSessions(); + getProtocolManager().getStateManager().scanSessions(); } public MQTTProtocolManager getProtocolManager() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java index c6b4c3b088..60d1302a98 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java @@ -76,7 +76,13 @@ public class MessageReceiptTests extends MQTT5TestSupport { for (int i = 0; i < CONSUMER_COUNT; i++) { producer.publish(TOPIC + i, ("hello" + i).getBytes(), 0, false); } - Wait.assertEquals((long) CONSUMER_COUNT, () -> server.getActiveMQServerControl().getTotalMessagesAdded(), 2000, 100); + Wait.assertEquals((long) CONSUMER_COUNT, () -> { + int totalMessagesAdded = 0; + for (int i = 0; i < CONSUMER_COUNT; i++) { + totalMessagesAdded += getSubscriptionQueue(TOPIC + i).getMessagesAdded(); + } + return totalMessagesAdded; + }, 2000, 100); producer.disconnect(); producer.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java index 34bc04e54f..64c84c4945 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityPerAcceptorJmsTest.java @@ -99,7 +99,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { @Test public void testJAASSecurityManagerAuthentication() throws Exception { - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); server.start(); try (Connection c = cf.createConnection("first", "secret")) { Thread.sleep(200); @@ -113,7 +113,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { final SimpleString ADDRESS = new SimpleString("address"); ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(); - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); Set roles = new HashSet<>(); roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false)); server.getConfiguration().putSecurityRoles("#", roles); @@ -163,7 +163,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase { public void testJAASSecurityManagerAuthorizationPositive() throws Exception { final String ADDRESS = "address"; - ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false)); Set roles = new HashSet<>(); roles.add(new Role("programmers", true, true, true, true, true, true, true, true, true, true)); server.getConfiguration().putSecurityRoles("#", roles); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java index cc22d6389b..3526c3cec8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/SecurityTest.java @@ -328,7 +328,7 @@ public class SecurityTest extends ActiveMQTestBase { params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass"); params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); - server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); + server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); // ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory HashSet roles = new HashSet<>(); @@ -374,7 +374,7 @@ public class SecurityTest extends ActiveMQTestBase { params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass"); params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); - server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); + server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); server.start(); ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616?verifyHostName=false");