From 5b2c47567dbc318c1c4c1ff821e072e74675ad34 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 11 Mar 2019 18:00:30 +0100 Subject: [PATCH] ARTEMIS-1604 Artemis deadlock using MQTT Protocol MQTT shouldn't support direct deliveries --- .../protocol/mqtt/MQTTSessionCallback.java | 5 +++ .../core/protocol/stomp/StompSession.java | 5 +++ .../artemis/core/server/impl/QueueImpl.java | 2 +- .../integration/mqtt/imported/MQTTTest.java | 20 ++++++++++++ .../tests/integration/stomp/StompTest.java | 32 +++++++++++++++++++ 5 files changed, 63 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index a49cf116c6..50d57327a4 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -36,6 +36,11 @@ public class MQTTSessionCallback implements SessionCallback { this.connection = connection; } + @Override + public boolean supportsDirectDelivery() { + return false; + } + @Override public boolean isWritable(ReadyListener callback, Object protocolContext) { return connection.isWritable(callback); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index b355168915..80bbbe8613 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -79,6 +79,11 @@ public class StompSession implements SessionCallback { this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration()); } + @Override + public boolean supportsDirectDelivery() { + return false; + } + @Override public boolean isWritable(ReadyListener callback, Object protocolContext) { return connection.isWritable(callback); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 92fc9bf8ba..7e736ca113 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2348,7 +2348,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean isDirectDeliver() { - return directDeliver; + return directDeliver && supportsDirectDeliver; } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 9fc6cfd4d3..03bcddd275 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -43,6 +43,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -63,6 +65,7 @@ import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.fusesource.mqtt.codec.PUBLISH; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -151,6 +154,23 @@ public class MQTTTest extends MQTTTestSupport { publishProvider.disconnect(); } + @Test(timeout = 60 * 1000) + public void testDirectDeliverFalse() throws Exception { + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); + + + for (Binding b : server.getPostOffice().getAllBindings().values()) { + if (b instanceof QueueBinding) { + Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); + } + } + + subscriptionProvider.disconnect(); + } + @Test(timeout = 60 * 1000) public void testUnsubscribeMQTT() throws Exception { final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 16ffca243c..ba0616f066 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -46,6 +46,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; @@ -2009,4 +2011,34 @@ public class StompTest extends StompTestBase { assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST)); Assert.assertNull(server.locateQueue(simpleQueueName)); } + + + + @Test + public void directDeliverDisabledOnStomp() throws Exception { + String payload = "This is a test message"; + + // Set up STOMP subscription + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); + + for (Binding b : server.getPostOffice().getAllBindings().values()) { + if (b instanceof QueueBinding) { + Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver()); + } + } + + // Send MQTT Message + MQTTClientProvider clientProvider = new FuseMQTTClientProvider(); + clientProvider.connect("tcp://" + hostname + ":" + port); + clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0); + clientProvider.disconnect(); + + // Receive STOMP Message + ClientStompFrame frame = conn.receiveFrame(); + assertTrue(frame.getBody() + .contains(payload)); + + } + }