From 1ed7a616ee5e9beedb173c56a9226e314339a8e4 Mon Sep 17 00:00:00 2001 From: Justin Bertram <jbertram@apache.org> Date: Mon, 18 Jun 2018 15:19:48 -0500 Subject: [PATCH 1/2] ARTEMIS-1930 require STOMP durable sub name to unsubscribe --- .../core/protocol/stomp/StompSession.java | 16 ++++----- .../protocol/stomp/StompSubscription.java | 14 ++++---- .../tests/integration/stomp/StompTest.java | 34 +++++++++++++++++++ 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index e370c812f2..291634f618 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; @@ -255,14 +256,12 @@ public class StompSession implements SessionCallback { SimpleString address = SimpleString.toSimpleString(destination); SimpleString queueName = SimpleString.toSimpleString(destination); SimpleString selectorSimple = SimpleString.toSimpleString(selector); - boolean pubSub = false; final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits; Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes(); - boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); - if (topic) { + boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); + if (multicast) { // subscribes to a topic - pubSub = true; if (durableSubscriptionName != null) { if (clientID == null) { throw BUNDLE.missingClientID(); @@ -276,8 +275,8 @@ public class StompSession implements SessionCallback { session.createQueue(address, queueName, selectorSimple, true, false); } } - final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0); - StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub); + final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0); + StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast); subscriptions.put(consumerID, subscription); session.start(); return () -> consumer.receiveCredits(receiveCredits); @@ -295,14 +294,15 @@ public class StompSession implements SessionCallback { iterator.remove(); SimpleString queueName = sub.getQueueName(); session.closeConsumer(consumerID); - if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) { + Queue queue = manager.getServer().locateQueue(queueName); + if (sub.isMulticast() && queue != null && (durableSubscriptionName == null && !queue.isDurable())) { session.deleteQueue(queueName); } result = true; } } - if (!result && durableSubscriptionName != null && clientID != null) { + if (durableSubscriptionName != null && clientID != null) { SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); if (manager.getServer().locateQueue(queueName) != null) { session.deleteQueue(queueName); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java index a1417adc9f..de6044bfbe 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java @@ -29,18 +29,18 @@ public class StompSubscription { private final SimpleString queueName; - // whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic) - private final boolean pubSub; + // whether or not this subscription follows multicast semantics (e.g. for a JMS topic) + private final boolean multicast; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) { + public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) { this.subID = subID; this.ack = ack; this.queueName = queueName; - this.pubSub = pubSub; + this.multicast = multicast; } // Public -------------------------------------------------------- @@ -57,13 +57,13 @@ public class StompSubscription { return queueName; } - public boolean isPubSub() { - return pubSub; + public boolean isMulticast() { + return multicast; } @Override public String toString() { - return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]"; + return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]"; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index bc363f21ef..5c6eefec85 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1343,6 +1343,40 @@ public class StompTest extends StompTestBase { assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); } + @Test + public void testDurableUnSubscribeWithoutDurableSubName() throws Exception { + server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/'); + server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST)); + conn.connect(defUser, defPass, "myclientid"); + String subId = UUID.randomUUID().toString(); + String durableSubName = UUID.randomUUID().toString(); + String receipt = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo") + .addHeader(Stomp.Headers.Unsubscribe.ID, subId) + .addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL) + .addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt); + + frame = conn.sendFrame(frame); + assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100)); + + receipt = UUID.randomUUID().toString(); + frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE) + .addHeader(Stomp.Headers.Unsubscribe.ID, subId) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt); + + frame = conn.sendFrame(frame); + assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + conn.disconnect(); + + // make sure the durable subscription queue is still there + assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100)); + } + @Test public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception { conn.connect(defUser, defPass, "myclientid"); From c0d28432ad113190ba51777c7e92f38be87e933d Mon Sep 17 00:00:00 2001 From: Justin Bertram <jbertram@apache.org> Date: Mon, 18 Jun 2018 19:14:15 -0500 Subject: [PATCH 2/2] NO-JIRA STOMP frame logging --- .../core/protocol/stomp/StompConnection.java | 24 +++++++++++++++++++ .../core/protocol/stomp/StompFrame.java | 8 +++++-- .../protocol/stomp/StompProtocolManager.java | 6 ++--- docs/user-manual/en/stomp.md | 8 +++++++ 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index f7accb1ea6..92b6edc46f 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -54,11 +54,14 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.VersionLoader; +import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; public final class StompConnection implements RemotingConnection { + private static final Logger logger = Logger.getLogger(StompConnection.class); + protected static final String CONNECTION_ID_PROP = "__AMQ_CID"; private static final String SERVER_NAME = "ActiveMQ-Artemis/" + VersionLoader.getVersion().getFullVersion() + " ActiveMQ Artemis Messaging Engine"; @@ -582,6 +585,27 @@ public final class StompConnection implements RemotingConnection { } } + public void logFrame(StompFrame request, boolean in) { + if (logger.isDebugEnabled()) { + StringBuilder message = new StringBuilder() + .append("STOMP(") + .append(getRemoteAddress()) + .append(", ") + .append(this.getID()) + .append("):"); + + if (in) { + message.append(" IN << "); + } else { + message.append("OUT >> "); + } + + message.append(request); + + logger.debug(message.toString()); + } + } + public void sendFrame(StompFrame frame, StompPostReceiptFunction function) { manager.sendReply(this, frame, function); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java index 439eba2823..1ba5d382d8 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java @@ -77,8 +77,12 @@ public class StompFrame { @Override public String toString() { - return "StompFrame[command=" + command + ", headers=" + headers + ", content= " + this.body + " bytes " + - Arrays.toString(bytesBody); + return new StringBuilder() + .append("StompFrame[command=").append(command) + .append(", headers=").append(headers) + .append(", content= ").append(this.body) + .append(", bytes= ").append(Arrays.toString(bytesBody)) + .toString(); } public boolean isPing() { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 62df6e5a4e..19a00626db 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -155,6 +155,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St try { invokeInterceptors(this.incomingInterceptors, request, conn); + conn.logFrame(request, true); conn.handleFrame(request); } finally { server.getStorageManager().clearContext(); @@ -186,11 +187,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St // Public -------------------------------------------------------- public boolean send(final StompConnection connection, final StompFrame frame) { - if (ActiveMQStompProtocolLogger.LOGGER.isTraceEnabled()) { - ActiveMQStompProtocolLogger.LOGGER.trace("sent " + frame); - } - invokeInterceptors(this.outgoingInterceptors, frame, connection); + connection.logFrame(frame, false); synchronized (connection) { if (connection.isDestroyed()) { diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md index 4abe8d4011..27db086b07 100644 --- a/docs/user-manual/en/stomp.md +++ b/docs/user-manual/en/stomp.md @@ -38,6 +38,14 @@ In Apache ActiveMQ Artemis, these destinations are mapped to *addresses* and *queues* depending on the operation being done and the desired semantics (e.g. anycast or multicast). +## Logging + +Incoming and outgoing STOMP frames can be logged by enabling `DEBUG` for +`org.apache.activemq.artemis.core.protocol.stomp.StompConnection`. This can be +extremely useful for debugging or simply monitoring client activity. Along with +the STOMP frame itself the remote IP address of the client is logged as well as +the internal connection ID so that frames from the same client can be correlated. + ## Sending When a STOMP client sends a message (using a `SEND` frame), the protocol