ARTEMIS-1604 Artemis deadlock using MQTT Protocol

MQTT shouldn't support direct deliveries
This commit is contained in:
Francesco Nigro 2019-03-11 18:00:30 +01:00 committed by Clebert Suconic
parent 0872f749a9
commit 5b2c47567d
5 changed files with 63 additions and 1 deletions

View File

@ -36,6 +36,11 @@ public class MQTTSessionCallback implements SessionCallback {
this.connection = connection; this.connection = connection;
} }
@Override
public boolean supportsDirectDelivery() {
return false;
}
@Override @Override
public boolean isWritable(ReadyListener callback, Object protocolContext) { public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback); return connection.isWritable(callback);

View File

@ -79,6 +79,11 @@ public class StompSession implements SessionCallback {
this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration()); this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
} }
@Override
public boolean supportsDirectDelivery() {
return false;
}
@Override @Override
public boolean isWritable(ReadyListener callback, Object protocolContext) { public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback); return connection.isWritable(callback);

View File

@ -2348,7 +2348,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public boolean isDirectDeliver() { public boolean isDirectDeliver() {
return directDeliver; return directDeliver && supportsDirectDeliver;
} }
/** /**

View File

@ -43,6 +43,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -63,6 +65,7 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH; import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -151,6 +154,23 @@ public class MQTTTest extends MQTTTestSupport {
publishProvider.disconnect(); publishProvider.disconnect();
} }
@Test(timeout = 60 * 1000)
public void testDirectDeliverFalse() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
}
subscriptionProvider.disconnect();
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testUnsubscribeMQTT() throws Exception { public void testUnsubscribeMQTT() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();

View File

@ -46,6 +46,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
@ -2009,4 +2011,34 @@ public class StompTest extends StompTestBase {
assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST)); assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST));
Assert.assertNull(server.locateQueue(simpleQueueName)); Assert.assertNull(server.locateQueue(simpleQueueName));
} }
@Test
public void directDeliverDisabledOnStomp() throws Exception {
String payload = "This is a test message";
// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
}
// Send MQTT Message
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://" + hostname + ":" + port);
clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0);
clientProvider.disconnect();
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
assertTrue(frame.getBody()
.contains(payload));
}
} }