diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 8efea0a9ae..c24a684c4f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -58,15 +58,15 @@ public class MQTTConnectionManager { /** * Handles the connect packet. See spec for details on each of parameters. */ - synchronized void connect(String cId, - String username, - byte[] passwordInBytes, - boolean will, - byte[] willMessage, - String willTopic, - boolean willRetain, - int willQosLevel, - boolean cleanSession) throws Exception { + void connect(String cId, + String username, + byte[] passwordInBytes, + boolean will, + byte[] willMessage, + String willTopic, + boolean willRetain, + int willQosLevel, + boolean cleanSession) throws Exception { String clientId = validateClientId(cId, cleanSession); if (clientId == null) { session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); @@ -74,34 +74,36 @@ public class MQTTConnectionManager { return; } - String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8); - session.getConnection().setClientID(clientId); - ServerSessionImpl serverSession = createServerSession(username, password); - serverSession.start(); - session.setServerSession(serverSession); + MQTTSessionState sessionState = getSessionState(clientId); + synchronized (sessionState) { + session.setSessionState(sessionState); + String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8); + session.getConnection().setClientID(clientId); + ServerSessionImpl serverSession = createServerSession(username, password); + serverSession.start(); + session.setServerSession(serverSession); - session.setSessionState(getSessionState(clientId)); + if (cleanSession) { + /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and + * start a new one. This Session lasts as long as the Network Connection. State data associated with this Session + * MUST NOT be reused in any subsequent Session */ + session.clean(); + session.setClean(true); + } - if (cleanSession) { - /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and - * start a new one. This Session lasts as long as the Network Connection. State data associated with this Session - * MUST NOT be reused in any subsequent Session */ - session.clean(); - session.setClean(true); + if (will) { + isWill = true; + this.willMessage = ByteBufAllocator.DEFAULT.buffer(willMessage.length); + this.willMessage.writeBytes(willMessage); + this.willQoSLevel = willQosLevel; + this.willRetain = willRetain; + this.willTopic = willTopic; + } + + session.getConnection().setConnected(true); + session.start(); + session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED); } - - if (will) { - isWill = true; - this.willMessage = ByteBufAllocator.DEFAULT.buffer(willMessage.length); - this.willMessage.writeBytes(willMessage); - this.willQoSLevel = willQosLevel; - this.willRetain = willRetain; - this.willTopic = willTopic; - } - - session.getConnection().setConnected(true); - session.start(); - session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED); } /** @@ -133,35 +135,37 @@ public class MQTTConnectionManager { return (ServerSessionImpl) serverSession; } - synchronized void disconnect(boolean failure) { + void disconnect(boolean failure) { if (session == null || session.getStopped()) { return; } - try { - if (isWill && failure) { - session.getMqttPublishManager().sendInternal(0, willTopic, willQoSLevel, willMessage, willRetain, true); - } - session.stop(); - session.getConnection().destroy(); - } catch (Exception e) { - log.error("Error disconnecting client: " + e.getMessage()); - } finally { - if (session.getSessionState() != null) { - session.getSessionState().setAttached(false); - String clientId = session.getSessionState().getClientId(); - /** - * ensure that the connection for the client ID matches *this* connection otherwise we could remove the - * entry for the client who "stole" this client ID via [MQTT-3.1.4-2] - */ - if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) { - session.getProtocolManager().removeConnectedClient(clientId); + synchronized (session.getSessionState()) { + try { + if (isWill && failure) { + session.getMqttPublishManager().sendInternal(0, willTopic, willQoSLevel, willMessage, willRetain, true); + } + session.stop(); + session.getConnection().destroy(); + } catch (Exception e) { + log.error("Error disconnecting client: " + e.getMessage()); + } finally { + if (session.getSessionState() != null) { + session.getSessionState().setAttached(false); + String clientId = session.getSessionState().getClientId(); + /** + * ensure that the connection for the client ID matches *this* connection otherwise we could remove the + * entry for the client who "stole" this client ID via [MQTT-3.1.4-2] + */ + if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) { + session.getProtocolManager().removeConnectedClient(clientId); + } } } } } - private MQTTSessionState getSessionState(String clientId) { + private synchronized MQTTSessionState getSessionState(String clientId) { return session.getProtocolManager().getSessionState(clientId); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 4093f5e28c..501c26bf85 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -182,24 +183,37 @@ public class MQTTSubscriptionManager { } void removeSubscriptions(List topics) throws Exception { - for (String topic : topics) { - removeSubscription(topic); + synchronized (session.getSessionState()) { + for (String topic : topics) { + removeSubscription(topic); + } } } - // FIXME: Do we need this synchronzied? - private synchronized void removeSubscription(String address) throws Exception { + private void removeSubscription(String address) throws Exception { String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, session.getWildcardConfiguration()); - SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); - - ServerConsumer consumer = consumers.get(address); - consumers.remove(address); - if (consumer != null) { - consumer.close(false); - consumerQoSLevels.remove(consumer.getID()); + SimpleString sAddress = SimpleString.toSimpleString(internalAddress); + AddressInfo addressInfo = session.getServerSession().getAddress(sAddress); + if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) { + ServerConsumer consumer = consumers.get(address); + consumers.remove(address); + if (consumer != null) { + consumer.close(false); + consumerQoSLevels.remove(consumer.getID()); + } + } else { + consumers.remove(address); + Queue queue = session.getServer().locateQueue(internalQueueName); + Set queueConsumers; + if (queue != null && (queueConsumers = (Set) queue.getConsumers()) != null) { + for (Consumer consumer : queueConsumers) { + ((ServerConsumer) consumer).close(false); + consumerQoSLevels.remove(((ServerConsumer) consumer).getID()); + } + } } if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) { @@ -219,13 +233,15 @@ public class MQTTSubscriptionManager { * @throws Exception */ int[] addSubscriptions(List subscriptions) throws Exception { - int[] qos = new int[subscriptions.size()]; + synchronized (session.getSessionState()) { + int[] qos = new int[subscriptions.size()]; - for (int i = 0; i < subscriptions.size(); i++) { - addSubscription(subscriptions.get(i)); - qos[i] = subscriptions.get(i).qualityOfService().value(); + for (int i = 0; i < subscriptions.size(); i++) { + addSubscription(subscriptions.get(i)); + qos[i] = subscriptions.get(i).qualityOfService().value(); + } + return qos; } - return qos; } Map getConsumerQoSLevels() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java new file mode 100644 index 0000000000..cb97e16bdd --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class MQTTQueueCleanTest extends MQTTTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTQueueCleanTest.class); + + @Test + public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception { + Random random = new Random(); + Set clientProviders = new HashSet<>(11); + int repeatCount = 0; + String address = "clean/test"; + String clientId = "sameClientId"; + String queueName = "::sameClientId.clean.test"; + //The abnormal scene does not necessarily occur, repeating 100 times to ensure the recurrence of the abnormality + while (repeatCount < 100) { + repeatCount++; + int subConnectionCount = random.nextInt(50) + 1; + int sC = 0; + try { + //Reconnect at least twice to reproduce the problem + while (sC < subConnectionCount) { + sC++; + MQTTClientProvider clientProvider = getMQTTClientProvider(); + clientProvider.setClientId(clientId); + initializeConnection(clientProvider); + clientProviders.add(clientProvider); + clientProvider.subscribe(address, AT_LEAST_ONCE); + } + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } finally { + for (MQTTClientProvider clientProvider : clientProviders) { + clientProvider.disconnect(); + } + clientProviders.clear(); + assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10)); + } + } + } + +}