ARTEMIS-2686 Fix MQTT connect message rejection
Initialize the session state with a default value to fix a NPE, when an incoming MQTT interceptor rejects a MqttConnectMessage.
This commit is contained in:
parent
a4f0e7660e
commit
c36170477d
|
@ -77,6 +77,8 @@ public class MQTTSession {
|
|||
subscriptionManager = new MQTTSubscriptionManager(this);
|
||||
retainMessageManager = new MQTTRetainMessageManager(this);
|
||||
|
||||
state = MQTTSessionState.DEFAULT;
|
||||
|
||||
log.debug("SESSION CREATED: " + id);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
|||
|
||||
public class MQTTSessionState {
|
||||
|
||||
public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);
|
||||
|
||||
private String clientId;
|
||||
|
||||
private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttConnectMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
|
@ -61,4 +64,40 @@ public class MQTTRejectingInterceptorTest extends MQTTTestSupport {
|
|||
subscribeProvider.disconnect();
|
||||
publishProvider.disconnect();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRejectedMqttConnectMessage() throws Exception {
|
||||
CountDownLatch publishThreadReady = new CountDownLatch(1);
|
||||
|
||||
server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) (packet, connection) -> {
|
||||
if (packet.getClass() == MqttConnectMessage.class) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
Thread publishThread = new Thread(() -> {
|
||||
MQTTClientProvider publishProvider = getMQTTClientProvider();
|
||||
|
||||
publishThreadReady.countDown();
|
||||
|
||||
try {
|
||||
initializeConnection(publishProvider);
|
||||
publishProvider.disconnect();
|
||||
fail("The connection should be rejected!");
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
});
|
||||
|
||||
publishThread.start();
|
||||
|
||||
publishThreadReady.await();
|
||||
|
||||
publishThread.join(3000);
|
||||
|
||||
if (publishThread.isAlive()) {
|
||||
fail("The connection is stuck!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue