ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleared, and caused Artemis broker to throw a NullPointerException.

When the MQTT consumer client (cleanSession property set to true) reconnected, there are certain probabilities that these two bugs will occur.
This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when new and old connections occur while operating the same queue for unsafe behavior.
This commit is contained in:
onlyMIT 2018-12-19 16:26:12 +08:00 committed by Clebert Suconic
parent 2a3ce34a58
commit 971f673c60
3 changed files with 159 additions and 70 deletions

View File

@ -58,7 +58,7 @@ public class MQTTConnectionManager {
/**
* Handles the connect packet. See spec for details on each of parameters.
*/
synchronized void connect(String cId,
void connect(String cId,
String username,
byte[] passwordInBytes,
boolean will,
@ -74,14 +74,15 @@ public class MQTTConnectionManager {
return;
}
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
session.getConnection().setClientID(clientId);
ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start();
session.setServerSession(serverSession);
session.setSessionState(getSessionState(clientId));
if (cleanSession) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
@ -103,6 +104,7 @@ public class MQTTConnectionManager {
session.start();
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
}
/**
* Creates an internal Server Session.
@ -133,11 +135,12 @@ public class MQTTConnectionManager {
return (ServerSessionImpl) serverSession;
}
synchronized void disconnect(boolean failure) {
void disconnect(boolean failure) {
if (session == null || session.getStopped()) {
return;
}
synchronized (session.getSessionState()) {
try {
if (isWill && failure) {
session.getMqttPublishManager().sendInternal(0, willTopic, willQoSLevel, willMessage, willRetain, true);
@ -160,8 +163,9 @@ public class MQTTConnectionManager {
}
}
}
}
private MQTTSessionState getSessionState(String clientId) {
private synchronized MQTTSessionState getSessionState(String clientId) {
return session.getProtocolManager().getSessionState(clientId);
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -182,25 +183,38 @@ public class MQTTSubscriptionManager {
}
void removeSubscriptions(List<String> topics) throws Exception {
synchronized (session.getSessionState()) {
for (String topic : topics) {
removeSubscription(topic);
}
}
}
// FIXME: Do we need this synchronzied?
private synchronized void removeSubscription(String address) throws Exception {
private void removeSubscription(String address) throws Exception {
String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, session.getWildcardConfiguration());
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
session.getSessionState().removeSubscription(address);
SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
AddressInfo addressInfo = session.getServerSession().getAddress(sAddress);
if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
ServerConsumer consumer = consumers.get(address);
consumers.remove(address);
if (consumer != null) {
consumer.close(false);
consumerQoSLevels.remove(consumer.getID());
}
} else {
consumers.remove(address);
Queue queue = session.getServer().locateQueue(internalQueueName);
Set<Consumer> queueConsumers;
if (queue != null && (queueConsumers = (Set<Consumer>) queue.getConsumers()) != null) {
for (Consumer consumer : queueConsumers) {
((ServerConsumer) consumer).close(false);
consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
}
}
}
if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
session.getServerSession().deleteQueue(internalQueueName);
@ -219,6 +233,7 @@ public class MQTTSubscriptionManager {
* @throws Exception
*/
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception {
synchronized (session.getSessionState()) {
int[] qos = new int[subscriptions.size()];
for (int i = 0; i < subscriptions.size(); i++) {
@ -227,6 +242,7 @@ public class MQTTSubscriptionManager {
}
return qos;
}
}
Map<Long, Integer> getConsumerQoSLevels() {
return consumerQoSLevels;

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
public class MQTTQueueCleanTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTQueueCleanTest.class);
@Test
public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception {
Random random = new Random();
Set<MQTTClientProvider> clientProviders = new HashSet<>(11);
int repeatCount = 0;
String address = "clean/test";
String clientId = "sameClientId";
String queueName = "::sameClientId.clean.test";
//The abnormal scene does not necessarily occur, repeating 100 times to ensure the recurrence of the abnormality
while (repeatCount < 100) {
repeatCount++;
int subConnectionCount = random.nextInt(50) + 1;
int sC = 0;
try {
//Reconnect at least twice to reproduce the problem
while (sC < subConnectionCount) {
sC++;
MQTTClientProvider clientProvider = getMQTTClientProvider();
clientProvider.setClientId(clientId);
initializeConnection(clientProvider);
clientProviders.add(clientProvider);
clientProvider.subscribe(address, AT_LEAST_ONCE);
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
} finally {
for (MQTTClientProvider clientProvider : clientProviders) {
clientProvider.disconnect();
}
clientProviders.clear();
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10));
}
}
}
}