This commit is contained in:
Clebert Suconic 2020-04-08 15:45:19 -04:00
commit 598c2631ad
3 changed files with 43 additions and 0 deletions

View File

@ -77,6 +77,8 @@ public class MQTTSession {
subscriptionManager = new MQTTSubscriptionManager(this); subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this); retainMessageManager = new MQTTRetainMessageManager(this);
state = MQTTSessionState.DEFAULT;
log.debug("SESSION CREATED: " + id); log.debug("SESSION CREATED: " + id);
} }

View File

@ -32,6 +32,8 @@ import org.apache.activemq.artemis.core.config.WildcardConfiguration;
public class MQTTSessionState { public class MQTTSessionState {
public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);
private String clientId; private String clientId;
private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>(); private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.tests.integration.mqtt.imported; package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import java.util.concurrent.CountDownLatch;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -61,4 +64,40 @@ public class MQTTRejectingInterceptorTest extends MQTTTestSupport {
subscribeProvider.disconnect(); subscribeProvider.disconnect();
publishProvider.disconnect(); publishProvider.disconnect();
} }
@Test(timeout = 60000)
public void testRejectedMqttConnectMessage() throws Exception {
CountDownLatch publishThreadReady = new CountDownLatch(1);
server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) (packet, connection) -> {
if (packet.getClass() == MqttConnectMessage.class) {
return false;
} else {
return true;
}
});
Thread publishThread = new Thread(() -> {
MQTTClientProvider publishProvider = getMQTTClientProvider();
publishThreadReady.countDown();
try {
initializeConnection(publishProvider);
publishProvider.disconnect();
fail("The connection should be rejected!");
} catch (Exception ignore) {
}
});
publishThread.start();
publishThreadReady.await();
publishThread.join(3000);
if (publishThread.isAlive()) {
fail("The connection is stuck!");
}
}
} }