ARTEMIS-966 MQTT subscription state isn't durable

Durable subscrption state is part of the MQTT specification which has
not been supported until now. This functionality is implemented via an
internal last-value queue. When an MQTT client creates, updates, or
adds a subscription a message using the client-ID as the last-value is
sent to the internal queue. When the broker restarts this data is read
from the queue and populates the in-memory MQTT data-structures.
Therefore subscribers can reconnect and resume their session's
subscriptions without have to manually resubscribe.

MQTT state is now managed centrally per-broker rather than in the
MQTTProtocolManager since there is one instance of MQTTProtocolManager
for each acceptor allowing MQTT connections. Managing state per acceptor
would allow odd behavior with clients connecting to different acceptors
with the same client ID.

The subscriptions are serialized as raw bytes with a "version" byte for
potential future use, but I intentionally avoided adding complex
scaffolding to support multiple versions. We can add that complexity
later if necessary.

Some tests needed to be changed since instantiating an MQTT protocol
manager now creates an internal queue. A handful of tests assume that no
queues will exist other than the ones they create themselves. I updated
the main test super-class so that an MQTT protocol manager is not
automatically instantiated when configuring a broker for in-vm support.
This commit is contained in:
Justin Bertram 2022-12-20 11:47:18 -06:00 committed by Robbie Gemmell
parent e7a27f0342
commit af2672e79a
36 changed files with 814 additions and 229 deletions

View File

@ -60,6 +60,11 @@ public class RandomUtil {
return Math.abs(RandomUtil.randomInt());
}
public static Integer randomPositiveIntOrNull() {
Integer random = RandomUtil.randomInt();
return random % 5 == 0 ? null : Math.abs(random);
}
public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);

View File

@ -1270,9 +1270,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public String toString() {
try {
final TypedProperties properties = getProperties();
return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this);
return "CoreMessage[messageID=" + messageID +
", durable=" + isDurable() +
", userID=" + getUserID() +
", priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) +
", expiration=" + toDate(getExpiration()) +
", durable=" + durable +
", address=" + getAddress() +
", size=" + getPersistentSize() +
", properties=" + properties +
"]@" + System.identityHashCode(this);
} catch (Throwable e) {
logger.warn("Error creating String for message: ", e);
return "ServerMessage[messageID=" + messageID + "]";

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.logs.BundleFactory;
import org.apache.activemq.artemis.logs.annotation.LogBundle;
import org.apache.activemq.artemis.logs.annotation.Message;
/**
* Logger Code 85
*/
@LogBundle(projectCode = "AMQ", regexID = "85[0-9]{4}")
public interface MQTTBundle {
MQTTBundle BUNDLE = BundleFactory.newBundle(MQTTBundle.class);
@Message(id = 850000, value = "Unable to store MQTT state within given timeout: {}ms")
IllegalStateException unableToStoreMqttState(long timeout);
}

View File

@ -67,7 +67,7 @@ public class MQTTConnectionManager {
boolean cleanStart = connect.variableHeader().isCleanSession();
String clientId = session.getConnection().getClientID();
boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId);
boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
@ -120,6 +120,7 @@ public class MQTTConnectionManager {
connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}
@ -193,15 +194,15 @@ public class MQTTConnectionManager {
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) {
session.getProtocolManager().removeConnectedClient(clientId);
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
}
}
}
}
private synchronized MQTTSessionState getSessionState(String clientId) {
return session.getProtocolManager().getSessionState(clientId);
private synchronized MQTTSessionState getSessionState(String clientId) throws Exception {
return session.getStateManager().getSessionState(clientId);
}
}

View File

@ -58,4 +58,7 @@ public interface MQTTLogger {
@LogMessage(id = 834007, value = "Authorization failure sending will message: {}", level = LogMessage.Level.ERROR)
void authorizationFailureSendingWillMessage(String message);
@LogMessage(id = 834008, value = "Failed to remove session state for client with ID: {}", level = LogMessage.Level.ERROR)
void failedToRemoveSessionState(String clientID, Exception e);
}

View File

@ -44,8 +44,8 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
@ -57,6 +57,7 @@ import java.lang.invoke.MethodHandles;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
/**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
@ -257,7 +258,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
return;
} else {
protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection());
protocolManager.getStateManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection());
}
if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) {
@ -377,7 +378,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handleSubscribe(MqttSubscribeMessage message) throws Exception {
int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), message.idAndPropertiesVariableHeader().properties());
Integer subscriptionIdentifier = MQTTUtil.getProperty(Integer.class, message.idAndPropertiesVariableHeader().properties(), SUBSCRIPTION_IDENTIFIER, null);
int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), subscriptionIdentifier);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(message.variableHeader().messageId(), MqttProperties.NO_PROPERTIES);
MqttSubAckMessage subAck = new MqttSubAckMessage(header, variableHeader, new MqttSubAckPayload(qos));
@ -385,7 +387,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics(), true);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage unsubAck;
if (session.getVersion() == MQTTVersion.MQTT_5) {
@ -462,14 +464,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
*
* However, this behavior is configurable via the "allowLinkStealing" acceptor URL property.
*/
private LinkStealingResult handleLinkStealing() {
private LinkStealingResult handleLinkStealing() throws Exception {
final String clientID = session.getConnection().getClientID();
LinkStealingResult result;
if (protocolManager.isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID);
if (protocolManager.getStateManager().isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getStateManager().getConnectedClient(clientID);
if (protocolManager.isAllowLinkStealing()) {
MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession();
MQTTSession existingSession = protocolManager.getStateManager().getSessionState(clientID).getSession();
if (existingSession != null) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);

View File

@ -16,12 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -36,6 +34,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
@ -47,7 +46,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRoutingHandler> implements NotificationListener {
@ -60,9 +58,6 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>();
private int defaultMqttSessionExpiryInterval = -1;
private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX;
@ -79,13 +74,23 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private final MQTTRoutingHandler routingHandler;
private MQTTStateManager sessionStateManager;
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
List<BaseInterceptor> outgoingInterceptors) throws Exception {
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
server.getManagementService().addNotificationListener(this);
routingHandler = new MQTTRoutingHandler(server);
sessionStateManager = MQTTStateManager.getInstance(server);
server.registerActivateCallback(new CleaningActivateCallback() {
@Override
public void deActivate() {
MQTTStateManager.removeInstance(server);
sessionStateManager = null;
}
});
}
public int getDefaultMqttSessionExpiryInterval() {
@ -176,7 +181,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
* in the SESSION_CREATED notification, you need to close this connection.
* Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time.
*/
MQTTConnection mqttConnection = connectedClients.get(clientId);
MQTTConnection mqttConnection = sessionStateManager.getConnectedClients().get(clientId);
if (mqttConnection != null) {
mqttConnection.destroy();
}
@ -197,39 +202,6 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
public void scanSessions() {
List<String> toRemove = new ArrayList();
for (Map.Entry<String, MQTTSessionState> entry : sessionStates.entrySet()) {
MQTTSessionState state = entry.getValue();
logger.debug("Inspecting session: {}", state);
int sessionExpiryInterval = getSessionExpiryInterval(state);
if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
}
if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
state.getSession().sendWillMessage();
}
}
for (String key : toRemove) {
logger.debug("Removing state for session: {}", key);
MQTTSessionState state = removeSessionState(key);
if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
}
}
}
private int getSessionExpiryInterval(MQTTSessionState state) {
int sessionExpiryInterval;
if (state.getClientSessionExpiryInterval() == 0) {
sessionExpiryInterval = getDefaultMqttSessionExpiryInterval();
} else {
sessionExpiryInterval = state.getClientSessionExpiryInterval();
}
return sessionExpiryInterval;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
try {
@ -348,56 +320,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
return super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
public boolean isClientConnected(String clientId, MQTTConnection connection) {
MQTTConnection connectedConn = connectedClients.get(clientId);
if (connectedConn != null) {
return connectedConn.equals(connection);
}
return false;
}
public boolean isClientConnected(String clientId) {
return connectedClients.containsKey(clientId);
}
public void removeConnectedClient(String clientId) {
connectedClients.remove(clientId);
}
/**
* @param clientId
* @param connection
* @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for
* the {@code clientId}
*/
public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
return connectedClients.put(clientId, connection);
}
public MQTTConnection getConnectedClient(String clientId) {
return connectedClients.get(clientId);
}
public MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
}
public MQTTSessionState removeSessionState(String clientId) {
if (clientId == null) {
return null;
}
return sessionStates.remove(clientId);
}
public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates);
}
/** For DEBUG only */
public Map<String, MQTTConnection> getConnectedClients() {
return connectedClients;
public MQTTStateManager getStateManager() {
return sessionStateManager;
}
}

View File

@ -84,7 +84,7 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<M
if (protocolHandler != null) {
protocolHandler.getProtocolMap().values().forEach(m -> {
if (m instanceof MQTTProtocolManager) {
((MQTTProtocolManager)m).scanSessions();
((MQTTProtocolManager)m).getStateManager().scanSessions();
}
});
}

View File

@ -64,6 +64,8 @@ public class MQTTSession {
private MQTTProtocolManager protocolManager;
private MQTTStateManager stateManager;
private boolean clean;
private WildcardConfiguration wildcardConfiguration;
@ -80,6 +82,7 @@ public class MQTTSession {
WildcardConfiguration wildcardConfiguration) throws Exception {
this.protocolHandler = protocolHandler;
this.protocolManager = protocolManager;
this.stateManager = protocolManager.getStateManager();
this.wildcardConfiguration = wildcardConfiguration;
this.connection = connection;
@ -87,7 +90,7 @@ public class MQTTSession {
mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
sessionCallback = new MQTTSessionCallback(this, connection);
subscriptionManager = new MQTTSubscriptionManager(this);
subscriptionManager = new MQTTSubscriptionManager(this, stateManager);
retainMessageManager = new MQTTRetainMessageManager(this);
state = MQTTSessionState.DEFAULT;
@ -120,11 +123,9 @@ public class MQTTSession {
internalServerSession.close(false);
}
if (state != null) {
state.setAttached(false);
state.setDisconnectedTime(System.currentTimeMillis());
state.clearTopicAliases();
}
state.setAttached(false);
state.setDisconnectedTime(System.currentTimeMillis());
state.clearTopicAliases();
if (getVersion() == MQTTVersion.MQTT_5) {
if (state.getClientSessionExpiryInterval() == 0) {
@ -133,9 +134,7 @@ public class MQTTSession {
sendWillMessage();
}
clean(false);
protocolManager.removeSessionState(connection.getClientID());
} else {
state.setDisconnectedTime(System.currentTimeMillis());
stateManager.removeSessionState(connection.getClientID());
}
} else {
if (state.isWill() && failure) {
@ -143,7 +142,7 @@ public class MQTTSession {
}
if (isClean()) {
clean(false);
protocolManager.removeSessionState(connection.getClientID());
stateManager.removeSessionState(connection.getClientID());
}
}
}
@ -226,6 +225,10 @@ public class MQTTSession {
return protocolManager;
}
MQTTStateManager getStateManager() {
return stateManager;
}
void clean(boolean enforceSecurity) throws Exception {
subscriptionManager.clean(enforceSecurity);
mqttPublishManager.clean();
@ -290,6 +293,9 @@ public class MQTTSession {
@Override
public String toString() {
return "MQTTSession[coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") + "]";
return "MQTTSession[" +
"coreSessionId: " + (serverSession != null ? serverSession.getName() : "null") +
", clientId: " + state.getClientId() +
"]";
}
}

View File

@ -30,9 +30,14 @@ import java.util.regex.Pattern;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.settings.impl.Match;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,11 +47,13 @@ public class MQTTSessionState {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);
public static final MQTTSessionState DEFAULT = new MQTTSessionState((String) null, null);
private MQTTSession session;
private String clientId;
private final String clientId;
private final MQTTStateManager stateManager;
private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>> subscriptions = new ConcurrentHashMap<>();
@ -91,8 +98,50 @@ public class MQTTSessionState {
private Map<String, Integer> serverTopicAliases;
public MQTTSessionState(String clientId) {
public MQTTSessionState(String clientId, MQTTStateManager stateManager) {
this.clientId = clientId;
this.stateManager = stateManager;
}
/**
* This constructor deserializes session data from a message. The format is as follows.
*
* - byte: version
* - int: subscription count
*
* There may be 0 or more subscriptions. The subscription format is as follows.
*
* - String: topic name
* - int: QoS
* - boolean: no-local
* - boolean: retain as published
* - int: retain handling
* - int (nullable): subscription identifier
*
* @param message the message holding the MQTT session data
* @param stateManager the manager used to add and remove sessions from storage
*/
public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager) {
logger.debug("Deserializing MQTT session state from {}", message);
this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
this.stateManager = stateManager;
ActiveMQBuffer buf = message.getDataBuffer();
// no need to use the version at this point
byte version = buf.readByte();
int subscriptionCount = buf.readInt();
logger.debug("Deserializing {} subscriptions", subscriptionCount);
for (int i = 0; i < subscriptionCount; i++) {
String topicName = buf.readString();
MqttQoS qos = MqttQoS.valueOf(buf.readInt());
boolean nolocal = buf.readBoolean();
boolean retainAsPublished = buf.readBoolean();
MqttSubscriptionOption.RetainedHandlingPolicy retainedHandlingPolicy = MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(buf.readInt());
Integer subscriptionId = buf.readNullableInt();
subscriptions.put(topicName, new Pair<>(new MqttTopicSubscription(topicName, new MqttSubscriptionOption(qos, nolocal, retainAsPublished, retainedHandlingPolicy)), subscriptionId));
}
}
public MQTTSession getSession() {
@ -103,7 +152,7 @@ public class MQTTSessionState {
this.session = session;
}
public synchronized void clear() {
public synchronized void clear() throws Exception {
subscriptions.clear();
messageRefStore.clear();
addressMessageMap.clear();
@ -148,7 +197,11 @@ public class MQTTSessionState {
return result;
}
public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) {
public Collection<Pair<MqttTopicSubscription, Integer>> getSubscriptionsPlusID() {
return subscriptions.values();
}
public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) throws Exception {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCoreAddress(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<>());
@ -172,7 +225,7 @@ public class MQTTSessionState {
}
}
public void removeSubscription(String address) {
public void removeSubscription(String address) throws Exception {
// synchronized to prevent race with addSubscription
synchronized (subscriptions) {
subscriptions.remove(address);
@ -184,6 +237,10 @@ public class MQTTSessionState {
return subscriptions.get(address) != null ? subscriptions.get(address).getA() : null;
}
public Pair<MqttTopicSubscription, Integer> getSubscriptionPlusID(String address) {
return subscriptions.get(address) != null ? subscriptions.get(address) : null;
}
public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, session.getServer().getConfiguration().getWildcardConfiguration());
List<Integer> result = null;
@ -207,10 +264,6 @@ public class MQTTSessionState {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public long getDisconnectedTime() {
return disconnectedTime;
}
@ -372,6 +425,29 @@ public class MQTTSessionState {
}
}
@Override
public String toString() {
return "MQTTSessionState[session=" + session +
", clientId=" + clientId +
", subscriptions=" + subscriptions +
", messageRefStore=" + messageRefStore +
", addressMessageMap=" + addressMessageMap +
", pubRec=" + pubRec +
", attached=" + attached +
", outboundStore=" + outboundStore +
", disconnectedTime=" + disconnectedTime +
", sessionExpiryInterval=" + clientSessionExpiryInterval +
", isWill=" + isWill +
", willMessage=" + willMessage +
", willTopic=" + willTopic +
", willQoSLevel=" + willQoSLevel +
", willRetain=" + willRetain +
", willDelayInterval=" + willDelayInterval +
", failed=" + failed +
", maxPacketSize=" + clientMaxPacketSize +
"]@" + System.identityHashCode(this);
}
public class OutboundStore {
private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
@ -445,11 +521,6 @@ public class MQTTSessionState {
}
}
@Override
public String toString() {
return "MQTTSessionState[" + "session=" + session + ", clientId='" + clientId + "', subscriptions=" + subscriptions + ", messageRefStore=" + messageRefStore + ", addressMessageMap=" + addressMessageMap + ", pubRec=" + pubRec + ", attached=" + attached + ", outboundStore=" + outboundStore + ", disconnectedTime=" + disconnectedTime + ", sessionExpiryInterval=" + clientSessionExpiryInterval + ", isWill=" + isWill + ", willMessage=" + willMessage + ", willTopic='" + willTopic + "', willQoSLevel=" + willQoSLevel + ", willRetain=" + willRetain + ", willDelayInterval=" + willDelayInterval + ", failed=" + failed + ", maxPacketSize=" + clientMaxPacketSize + ']';
}
public enum WillStatus {
NOT_SENT, SENT, SENDING;

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTStateManager {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ActiveMQServer server;
private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>();
private final Queue sessionStore;
private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
/*
* Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want
* one instance of MQTTSessionStateManager per-broker with the understanding that there can be multiple brokers in
* the same JVM.
*/
public static synchronized MQTTStateManager getInstance(ActiveMQServer server) throws Exception {
MQTTStateManager instance = INSTANCES.get(System.identityHashCode(server));
if (instance == null) {
instance = new MQTTStateManager(server);
INSTANCES.put(System.identityHashCode(server), instance);
}
return instance;
}
public static synchronized void removeInstance(ActiveMQServer server) {
INSTANCES.remove(System.identityHashCode(server));
}
private MQTTStateManager(ActiveMQServer server) throws Exception {
this.server = server;
sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true);
// load session data from queue
try (LinkedListIterator<MessageReference> iterator = sessionStore.browserIterator()) {
try {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this);
sessionStates.put(clientId, sessionState);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
}
}
public void scanSessions() {
List<String> toRemove = new ArrayList();
for (Map.Entry<String, MQTTSessionState> entry : sessionStates.entrySet()) {
MQTTSessionState state = entry.getValue();
logger.debug("Inspecting session: {}", state);
int sessionExpiryInterval = state.getClientSessionExpiryInterval();
if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
}
if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
state.getSession().sendWillMessage();
}
}
for (String key : toRemove) {
try {
MQTTSessionState state = removeSessionState(key);
if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
}
} catch (Exception e) {
MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
}
}
}
public MQTTSessionState getSessionState(String clientId) throws Exception {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
if (sessionStates.containsKey(clientId)) {
return sessionStates.get(clientId);
} else {
MQTTSessionState sessionState = new MQTTSessionState(clientId, this);
logger.debug("Adding MQTT session state for: {}", clientId);
sessionStates.put(clientId, sessionState);
storeSessionState(sessionState);
return sessionState;
}
}
public MQTTSessionState removeSessionState(String clientId) throws Exception {
logger.debug("Removing MQTT session state for: {}", clientId);
if (clientId == null) {
return null;
}
removeDurableSessionState(clientId);
return sessionStates.remove(clientId);
}
public void removeDurableSessionState(String clientId) throws Exception {
int deletedCount = sessionStore.deleteMatchingReferences(FilterImpl.createFilter(new StringBuilder(Message.HDR_LAST_VALUE_NAME).append(" = '").append(clientId).append("'").toString()));
logger.debug("Removed {} durable MQTT state records for: {}", deletedCount, clientId);
}
public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates);
}
@Override
public String toString() {
return "MQTTSessionStateManager@" + Integer.toHexString(System.identityHashCode(this));
}
public void storeSessionState(MQTTSessionState state) throws Exception {
logger.debug("Adding durable MQTT state record for: {}", state.getClientId());
/*
* It is imperative to ensure the routed message is actually *all the way* on the queue before proceeding
* otherwise there can be a race with removing it.
*/
CountDownLatch latch = new CountDownLatch(1);
Transaction tx = new TransactionImpl(server.getStorageManager());
server.getPostOffice().route(serializeState(state, server.getStorageManager().generateID()), tx, false);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
latch.countDown();
}
});
tx.commit();
final long timeout = 5000;
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
}
}
public static CoreMessage serializeState(MQTTSessionState state, long messageID) {
CoreMessage message = new CoreMessage().initBuffer(50).setMessageID(messageID);
message.setAddress(MQTTUtil.MQTT_SESSION_STORE);
message.setDurable(true);
message.putStringProperty(Message.HDR_LAST_VALUE_NAME, state.getClientId());
Collection<Pair<MqttTopicSubscription, Integer>> subscriptions = state.getSubscriptionsPlusID();
ActiveMQBuffer buf = message.getBodyBuffer();
/*
* This byte represents the "version". If the payload changes at any point in the future then we can detect that
* and adjust so that when users are upgrading we can still read the old data format.
*/
buf.writeByte((byte) 0);
buf.writeInt(subscriptions.size());
logger.debug("Serializing {} subscriptions", subscriptions.size());
for (Pair<MqttTopicSubscription, Integer> pair : subscriptions) {
MqttTopicSubscription sub = pair.getA();
buf.writeString(sub.topicName());
buf.writeInt(sub.option().qos().value());
buf.writeBoolean(sub.option().isNoLocal());
buf.writeBoolean(sub.option().isRetainAsPublished());
buf.writeInt(sub.option().retainHandling().value());
buf.writeNullableInt(pair.getB());
}
return message;
}
public boolean isClientConnected(String clientId, MQTTConnection connection) {
MQTTConnection connectedConn = connectedClients.get(clientId);
if (connectedConn != null) {
return connectedConn.equals(connection);
}
return false;
}
public boolean isClientConnected(String clientId) {
return connectedClients.containsKey(clientId);
}
public void removeConnectedClient(String clientId) {
connectedClients.remove(clientId);
}
/**
* @param clientId
* @param connection
* @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for
* the {@code clientId}
*/
public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
return connectedClients.put(clientId, connection);
}
public MQTTConnection getConnectedClient(String clientId) {
return connectedClients.get(clientId);
}
/** For DEBUG only */
public Map<String, MQTTConnection> getConnectedClients() {
return connectedClients;
}
}

View File

@ -16,13 +16,13 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@ -38,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH;
import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
@ -47,6 +46,8 @@ public class MQTTSubscriptionManager {
private final MQTTSession session;
private final MQTTStateManager stateManager;
private final ConcurrentMap<Long, Integer> consumerQoSLevels;
private final ConcurrentMap<String, ServerConsumer> consumers;
@ -66,8 +67,9 @@ public class MQTTSubscriptionManager {
private final char anyWords;
public MQTTSubscriptionManager(MQTTSession session) {
public MQTTSubscriptionManager(MQTTSession session, MQTTStateManager stateManager) {
this.session = session;
this.stateManager = stateManager;
singleWord = session.getServer().getConfiguration().getWildcardConfiguration().getSingleWord();
anyWords = session.getServer().getConfiguration().getWildcardConfiguration().getAnyWords();
@ -130,7 +132,7 @@ public class MQTTSubscriptionManager {
session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier);
}
} catch (Exception e) {
// if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue is removed
// if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue
q.deleteQueue();
throw e;
}
@ -246,56 +248,52 @@ public class MQTTSubscriptionManager {
consumerQoSLevels.put(cid, qos);
}
short[] removeSubscriptions(List<String> topics) throws Exception {
short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) throws Exception {
short[] reasonCodes;
MQTTSessionState state = session.getState();
synchronized (session.getState()) {
synchronized (state) {
reasonCodes = new short[topics.size()];
for (int i = 0; i < topics.size(); i++) {
reasonCodes[i] = removeSubscription(topics.get(i));
if (session.getState().getSubscription(topics.get(i)) == null) {
reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
continue;
}
short reasonCode = MQTTReasonCodes.SUCCESS;
try {
session.getState().removeSubscription(topics.get(i));
ServerConsumer removed = consumers.remove(parseTopicName(topics.get(i)));
if (removed != null) {
removed.close(false);
consumerQoSLevels.remove(removed.getID());
}
SimpleString internalQueueName = getQueueNameForTopic(topics.get(i));
Queue queue = session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else if (!topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) {
session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
}
}
} catch (Exception e) {
MQTTLogger.LOGGER.errorRemovingSubscription(e);
reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR;
}
reasonCodes[i] = reasonCode;
}
// store state after *all* requested subscriptions have been removed in memory
stateManager.storeSessionState(state);
}
return reasonCodes;
}
private short removeSubscription(String address) {
return removeSubscription(address, true);
}
private short removeSubscription(String topic, boolean enforceSecurity) {
if (session.getState().getSubscription(topic) == null) {
return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
}
short reasonCode = MQTTReasonCodes.SUCCESS;
try {
session.getState().removeSubscription(topic);
ServerConsumer removed = consumers.remove(parseTopicName(topic));
if (removed != null) {
removed.close(false);
consumerQoSLevels.remove(removed.getID());
}
SimpleString internalQueueName = getQueueNameForTopic(topic);
Queue queue = session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else if (!topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && queue.getConsumerCount() == 0)) {
session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
}
}
} catch (Exception e) {
MQTTLogger.LOGGER.errorRemovingSubscription(e);
reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR;
}
return reasonCode;
}
private SimpleString getQueueNameForTopic(String topic) {
if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
int slashIndex = topic.indexOf(SLASH) + 1;
@ -308,19 +306,15 @@ public class MQTTSubscriptionManager {
}
/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
* @param subscriptions
* @return An array of integers representing the list of accepted QoS for each topic.
* @throws Exception
*/
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions, MqttProperties properties) throws Exception {
synchronized (session.getState()) {
Integer subscriptionIdentifier = null;
if (properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()) != null) {
subscriptionIdentifier = (Integer) properties.getProperty(SUBSCRIPTION_IDENTIFIER.value()).value();
}
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions, Integer subscriptionIdentifier) throws Exception {
MQTTSessionState state = session.getState();
synchronized (state) {
int[] qos = new int[subscriptions.size()];
for (int i = 0; i < subscriptions.size(); i++) {
@ -354,6 +348,10 @@ public class MQTTSubscriptionManager {
}
}
}
// store state after *all* requested subscriptions have been created in memory
stateManager.storeSessionState(state);
return qos;
}
}
@ -362,9 +360,11 @@ public class MQTTSubscriptionManager {
return consumerQoSLevels;
}
void clean(boolean enforceSecurity) {
void clean(boolean enforceSecurity) throws Exception {
List<String> topics = new ArrayList<>();
for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) {
removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity);
topics.add(mqttTopicSubscription.topicName());
}
removeSubscriptions(topics, enforceSecurity);
}
}

View File

@ -86,6 +86,8 @@ public class MQTTUtil {
public static final char SLASH = '/';
public static final String MQTT_SESSION_STORE = DOLLAR + "sys.mqtt.sessions";
public static final String MQTT_RETAIN_ADDRESS_PREFIX = DOLLAR + "sys.mqtt.retain.";
public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level");

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StateSerDeTest {
@Test(timeout = 30000)
public void testSerDe() throws Exception {
for (int i = 0; i < 500; i++) {
String clientId = RandomUtil.randomString();
MQTTSessionState unserialized = new MQTTSessionState(clientId, null);
Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull();
for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) {
MqttTopicSubscription sub = new MqttTopicSubscription(RandomUtil.randomString(),
new MqttSubscriptionOption(MqttQoS.valueOf(RandomUtil.randomInterval(0, 3)),
RandomUtil.randomBoolean(),
RandomUtil.randomBoolean(),
MqttSubscriptionOption.RetainedHandlingPolicy.valueOf(RandomUtil.randomInterval(0, 3))));
unserialized.addSubscription(sub, MQTTUtil.MQTT_WILDCARD, subscriptionIdentifier);
}
CoreMessage serializedState = MQTTStateManager.serializeState(unserialized, 0);
MQTTSessionState deserialized = new MQTTSessionState(serializedState, null);
assertEquals(unserialized.getClientId(), deserialized.getClientId());
for (Pair<MqttTopicSubscription, Integer> unserializedEntry : unserialized.getSubscriptionsPlusID()) {
MqttTopicSubscription unserializedSub = unserializedEntry.getA();
Integer unserializedSubId = unserializedEntry.getB();
Pair<MqttTopicSubscription, Integer> deserializedEntry = deserialized.getSubscriptionPlusID(unserializedSub.topicName());
MqttTopicSubscription deserializedSub = deserializedEntry.getA();
Integer deserializedSubId = deserializedEntry.getB();
assertTrue(compareSubs(unserializedSub, deserializedSub));
assertEquals(unserializedSubId, deserializedSubId);
}
}
}
private boolean compareSubs(MqttTopicSubscription a, MqttTopicSubscription b) {
if (a == b) {
return true;
}
if (a == null || b == null) {
return false;
}
if (a.topicName() == null) {
if (b.topicName() != null) {
return false;
}
} else if (!a.topicName().equals(b.topicName())) {
return false;
}
if (a.option() == null) {
if (b.option() != null) {
return false;
}
} else {
if (a.option().qos() == null) {
if (b.option().qos() != null) {
return false;
}
} else if (a.option().qos().value() != b.option().qos().value()) {
return false;
}
if (a.option().retainHandling() == null) {
if (b.option().retainHandling() != null) {
return false;
}
} else if (a.option().retainHandling().value() != b.option().retainHandling().value()) {
return false;
}
if (a.option().isRetainAsPublished() != b.option().isRetainAsPublished()) {
return false;
}
if (a.option().isNoLocal() != b.option().isNoLocal()) {
return false;
}
}
return true;
}
}

View File

@ -40,4 +40,9 @@ public class InVMAcceptorFactory implements AcceptorFactory {
final Map<String, ProtocolManager> protocolMap) {
return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, protocolMap, threadPool);
}
@Override
public boolean supportsRemote() {
return false;
}
}

View File

@ -2226,6 +2226,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager);
synchronized (this) {
// ensure all messages are moved from intermediateMessageReferences so that they can be seen by the iterator
doInternalPoll();
try (LinkedListIterator<MessageReference> iter = iterator()) {

View File

@ -51,4 +51,8 @@ public interface AcceptorFactory {
ScheduledExecutorService scheduledThreadPool,
Map<String, ProtocolManager> protocolMap);
default boolean supportsRemote() {
return true;
}
}

View File

@ -585,6 +585,9 @@ public abstract class ActiveMQTestBase extends Assert {
if (netty) {
configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
} else {
// if we're in-vm it's a waste to resolve protocols since they'll never be used
configuration.setResolveProtocols(false);
}
return configuration;

View File

@ -89,6 +89,12 @@ As far as the broker is concerned a payload is just an array of bytes.
However, to facilitate logging the broker will encode the payloads as UTF-8 strings and print them up to 256 characters.
Payload logging is limited to avoid filling the logs with potentially hundreds of megabytes of unhelpful information.
== Persistent Subscriptions
The subscription information for MQTT sessions is stored in an internal queue named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is enabled).
The information is durable so that MQTT subscribers can reconnect and resume their subscriptions seamlessly after a broker restart, failure, etc.
When brokers are configured for high availability this information will be available on the backup so even in the case of a broker fail-over subscribers will be able to resume their subscriptions.
== Custom Client ID Handling
The client ID used by an MQTT application is very important as it uniquely identifies the application.

View File

@ -45,7 +45,7 @@ under the License.
<!-- We need to create a core queue for the JMS queue explicitly because the bridge will be deployed
before the JMS queue is deployed, so the first time, it otherwise won't find the queue -->
<diverts>
<divert name="order-divert">

View File

@ -28,6 +28,8 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
@ -58,6 +60,21 @@ public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase {
this.protocol = protocol;
}
@Override
protected void createConfigs() throws Exception {
nodeManager = createNodeManager();
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultNettyConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig);
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfig(live);

View File

@ -59,8 +59,8 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
super.setUp();
createReplicatedConfigs();
liveConfig.addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
backupConfig.addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
liveConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameLive + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
backupConfig.setResolveProtocols(true).addAcceptorConfiguration("amqp", smallFrameBackup + "?protocols=AMQP;useEpoll=false;maxFrameSize=512");
liveServer.start();
backupServer.start();

View File

@ -288,6 +288,7 @@ public class MessageExpirationTest extends ActiveMQTestBase {
server = createServer(true);
server.getConfiguration().addAcceptorConfiguration("amqp", "tcp://127.0.0.1:61616");
server.getConfiguration().setResolveProtocols(true);
server.getConfiguration().setMessageExpiryScanPeriod(200);
server.start();
locator = createInVMNonHALocator();

View File

@ -40,9 +40,9 @@ public class UpdateQueueTest extends ActiveMQTestBase {
@Test
public void testUpdateQueueWithNullUser() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQServer server = createServer(true, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0");
server.start();
@ -80,7 +80,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
Assert.assertEquals("newUser", user, queue.getUser());
factory = new ActiveMQConnectionFactory();
factory = new ActiveMQConnectionFactory("vm://0");
conn = factory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -110,9 +110,9 @@ public class UpdateQueueTest extends ActiveMQTestBase {
@Test
public void testUpdateQueue() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQServer server = createServer(true, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://0");
server.start();
@ -169,7 +169,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
Assert.assertEquals("newUser", queue.getUser().toString());
Assert.assertEquals(180L, queue.getRingSize());
factory = new ActiveMQConnectionFactory();
factory = new ActiveMQConnectionFactory("vm://0");
conn = factory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -86,7 +86,7 @@ public class MessageJournalTest extends ActiveMQTestBase {
@Test
public void testStoreAMQP() throws Throwable {
ActiveMQServer server = createServer(true);
ActiveMQServer server = createServer(true, true);
server.start();

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.junit.Test;
@ -34,8 +35,9 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
assertEquals(1, server.getPostOffice().getAllBindings().count());
Binding b = server.getPostOffice().getAllBindings().iterator().next();
Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah"));
assertEquals(1, bindings.size());
Binding b = bindings.getBindings().iterator().next();
//check that query using bare queue name works as before
QueueQueryResult result = server.queueQuery(b.getUniqueName());
assertTrue(result.isExists());
@ -126,8 +128,9 @@ public class MQTTFQQNTest extends MQTTTestSupport {
try {
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
assertEquals(1, server.getPostOffice().getAllBindings().count());
Binding b = server.getPostOffice().getAllBindings().iterator().next();
Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("foo.bah"));
assertEquals(1, bindings.size());
Binding b = bindings.getBindings().iterator().next();
//check ::queue
QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName()));

View File

@ -100,7 +100,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport {
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT");
if (protocolManager instanceof MQTTProtocolManager) {
sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates();
sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates();
}
}
assertEquals(1, sessionStates.size());
@ -132,7 +132,7 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport {
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT");
if (protocolManager instanceof MQTTProtocolManager) {
sessionStates = ((MQTTProtocolManager) protocolManager).getSessionStates();
sessionStates = ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates();
}
}
assertEquals(1, sessionStates.size());

View File

@ -2099,10 +2099,10 @@ public class MQTTTest extends MQTTTestSupport {
final int port2 = 1885;
final Configuration cfg1 = createDefaultConfig(1, false);
cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT");
cfg1.setResolveProtocols(true).addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT");
final Configuration cfg2 = createDefaultConfig(2, false);
cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT");
cfg2.setResolveProtocols(true).addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT");
final ActiveMQServer server1 = createServer(cfg1);
server1.start();

View File

@ -387,7 +387,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT");
if (protocolManager instanceof MQTTProtocolManager) {
return ((MQTTProtocolManager) protocolManager).getSessionStates();
return ((MQTTProtocolManager) protocolManager).getStateManager().getSessionStates();
}
}

View File

@ -68,14 +68,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics);
@ -258,14 +258,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size);
subConnection1 = null;
@ -456,14 +456,14 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
//Waiting for the first sub connection be closed
assertTrue(waitConnectionClosed(subConnection1));
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size);
subConnection1 = null;
subConnection2.subscribe(topics);
@ -622,7 +622,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -711,9 +711,9 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Thread.sleep(1000);
Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[1]).getStateManager().getConnectedClients()::size);
// Subscribe to topics
connection1.subscribe(topics);
@ -924,7 +924,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
Thread.sleep(1000);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(1, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
@ -1029,7 +1029,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size);
Wait.assertEquals(2, locateMQTTPM(servers[0]).getStateManager().getConnectedClients()::size);
subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
//Waiting for the first sub connection be closed

View File

@ -29,6 +29,7 @@ import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@ -221,7 +222,14 @@ public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport {
messageConsumer.close();
messageConsumerAllNews.close();
int countOfPageStores = server.getPagingManager().getStoreNames().length;
int countOfPageStores = 0;
SimpleString[] storeNames = server.getPagingManager().getStoreNames();
for (int i = 0; i < storeNames.length; i++) {
if (!storeNames[i].equals(SimpleString.toSimpleString(MQTTUtil.MQTT_SESSION_STORE))) {
countOfPageStores++;
}
}
assertEquals("there should be 5", 5, countOfPageStores);
connection.close();

View File

@ -45,6 +45,7 @@ import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.Test;
@ -82,6 +83,69 @@ public class MQTT5Test extends MQTT5TestSupport {
context.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testResumeSubscriptionsAfterRestart() throws Exception {
final int SUBSCRIPTION_COUNT = 100;
List<String> topicNames = new ArrayList<>(SUBSCRIPTION_COUNT);
for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
topicNames.add(getName() + i);
}
CountDownLatch latch = new CountDownLatch(SUBSCRIPTION_COUNT);
MqttClient consumer = createPahoClient("myConsumerID");
MqttConnectionOptions consumerOptions = new MqttConnectionOptionsBuilder()
.cleanStart(false)
.sessionExpiryInterval(999L)
.build();
consumer.connect(consumerOptions);
List<MqttSubscription> subs = new ArrayList<>(SUBSCRIPTION_COUNT);
for (String subName : topicNames) {
subs.add(new MqttSubscription(subName, 1));
}
consumer.subscribe(subs.toArray(new MqttSubscription[0]));
consumer.disconnect();
MqttClient producer = createPahoClient("myProducerID");
MqttConnectionOptions producerOptions = new MqttConnectionOptionsBuilder()
.sessionExpiryInterval(0L)
.build();
producer.connect(producerOptions);
for (String subName : topicNames) {
producer.publish(subName, new byte[0], 1, false);
}
producer.disconnect();
producer.close();
Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100);
server.stop();
server.start();
Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 2000, 100);
Wait.assertTrue(() -> getSessionStates().get("myConsumerID") != null, 2000, 100);
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
if (topicNames.remove(topic)) {
latch.countDown();
}
}
});
consumerOptions = new MqttConnectionOptionsBuilder()
.cleanStart(false)
.sessionExpiryInterval(0L)
.build();
consumer.connect(consumerOptions);
assertTrue(latch.await(2, TimeUnit.SECONDS));
consumer.unsubscribe(topicNames.toArray(new String[0]));
consumer.disconnect();
consumer.close();
Wait.assertEquals(0L, () -> server.locateQueue(MQTTUtil.MQTT_SESSION_STORE).getMessageCount(), 5000, 100);
}
/*
* Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testAddressAutoCreation() throws Exception {
final String DESTINATION = RandomUtil.randomString();
@ -470,6 +534,60 @@ public class MQTT5Test extends MQTT5TestSupport {
client.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testConnectionStealingOnMultipleAcceptors() throws Exception {
int secondaryPort = 1884;
final String CLIENT_ID = RandomUtil.randomString();
server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort);
server.getRemotingService().startAcceptors();
MqttClient client = createPahoClient(CLIENT_ID);
client.connect();
MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort);
client2.connect();
// only 1 session should exist
Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100);
assertNotNull(getSessionStates().get(CLIENT_ID));
assertFalse(client.isConnected());
client.close();
client2.disconnect();
client2.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testConnectionStealingDisabledOnMultipleAcceptors() throws Exception {
int secondaryPort = 1884;
final String CLIENT_ID = RandomUtil.randomString();
server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + secondaryPort + "?allowLinkStealing=false");
server.getRemotingService().startAcceptors();
MqttClient client = createPahoClient(CLIENT_ID);
client.connect();
MqttClient client2 = createPahoClient(CLIENT_ID, secondaryPort);
try {
client2.connect();
fail("Should have thrown an exception on connect due to disabled link stealing");
} catch (Exception e) {
// ignore expected exception
}
// only 1 session should exist
Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100);
assertNotNull(getSessionStates().get(CLIENT_ID));
assertTrue(client.isConnected());
client.disconnect();
client.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testQueueCleanedUpOnConsumerFail() throws Exception {
final String topic = getName();

View File

@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
}
protected MqttClient createPahoClient(String clientId, int port) throws MqttException {
return new MqttClient(protocol + "://localhost:" + port, clientId, new MemoryPersistence());
}
protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException {
return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
}
@ -333,12 +337,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
if (protocolManager == null) {
return Collections.emptyMap();
} else {
return protocolManager.getSessionStates();
return protocolManager.getStateManager().getSessionStates();
}
}
public void scanSessions() {
getProtocolManager().scanSessions();
getProtocolManager().getStateManager().scanSessions();
}
public MQTTProtocolManager getProtocolManager() {

View File

@ -76,7 +76,13 @@ public class MessageReceiptTests extends MQTT5TestSupport {
for (int i = 0; i < CONSUMER_COUNT; i++) {
producer.publish(TOPIC + i, ("hello" + i).getBytes(), 0, false);
}
Wait.assertEquals((long) CONSUMER_COUNT, () -> server.getActiveMQServerControl().getTotalMessagesAdded(), 2000, 100);
Wait.assertEquals((long) CONSUMER_COUNT, () -> {
int totalMessagesAdded = 0;
for (int i = 0; i < CONSUMER_COUNT; i++) {
totalMessagesAdded += getSubscriptionQueue(TOPIC + i).getMessagesAdded();
}
return totalMessagesAdded;
}, 2000, 100);
producer.disconnect();
producer.close();

View File

@ -99,7 +99,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase {
@Test
public void testJAASSecurityManagerAuthentication() throws Exception {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false));
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", URL + "?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false));
server.start();
try (Connection c = cf.createConnection("first", "secret")) {
Thread.sleep(200);
@ -113,7 +113,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase {
final SimpleString ADDRESS = new SimpleString("address");
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager();
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin").setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
Set<Role> roles = new HashSet<>();
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles("#", roles);
@ -163,7 +163,7 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase {
public void testJAASSecurityManagerAuthorizationPositive() throws Exception {
final String ADDRESS = "address";
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false));
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setSecurityEnabled(true).setResolveProtocols(true).addAcceptorConfiguration("netty", "tcp://127.0.0.1:61616?securityDomain=PropertiesLogin"), ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(), false));
Set<Role> roles = new HashSet<>();
roles.add(new Role("programmers", true, true, true, true, true, true, true, true, true, true));
server.getConfiguration().putSecurityRoles("#", roles);

View File

@ -328,7 +328,7 @@ public class SecurityTest extends ActiveMQTestBase {
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass");
params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
// ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory
HashSet<Role> roles = new HashSet<>();
@ -374,7 +374,7 @@ public class SecurityTest extends ActiveMQTestBase {
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "securepass");
params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
server.getConfiguration().setResolveProtocols(true).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
server.start();
ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616?verifyHostName=false");