This commit is contained in:
Clebert Suconic 2018-06-22 15:47:15 -04:00
commit 532317ceff
7 changed files with 89 additions and 21 deletions

View File

@ -54,11 +54,14 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public final class StompConnection implements RemotingConnection {
private static final Logger logger = Logger.getLogger(StompConnection.class);
protected static final String CONNECTION_ID_PROP = "__AMQ_CID";
private static final String SERVER_NAME = "ActiveMQ-Artemis/" + VersionLoader.getVersion().getFullVersion() +
" ActiveMQ Artemis Messaging Engine";
@ -582,6 +585,27 @@ public final class StompConnection implements RemotingConnection {
}
}
public void logFrame(StompFrame request, boolean in) {
if (logger.isDebugEnabled()) {
StringBuilder message = new StringBuilder()
.append("STOMP(")
.append(getRemoteAddress())
.append(", ")
.append(this.getID())
.append("):");
if (in) {
message.append(" IN << ");
} else {
message.append("OUT >> ");
}
message.append(request);
logger.debug(message.toString());
}
}
public void sendFrame(StompFrame frame, StompPostReceiptFunction function) {
manager.sendReply(this, frame, function);
}

View File

@ -77,8 +77,12 @@ public class StompFrame {
@Override
public String toString() {
return "StompFrame[command=" + command + ", headers=" + headers + ", content= " + this.body + " bytes " +
Arrays.toString(bytesBody);
return new StringBuilder()
.append("StompFrame[command=").append(command)
.append(", headers=").append(headers)
.append(", content= ").append(this.body)
.append(", bytes= ").append(Arrays.toString(bytesBody))
.toString();
}
public boolean isPing() {

View File

@ -155,6 +155,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
try {
invokeInterceptors(this.incomingInterceptors, request, conn);
conn.logFrame(request, true);
conn.handleFrame(request);
} finally {
server.getStorageManager().clearContext();
@ -186,11 +187,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
// Public --------------------------------------------------------
public boolean send(final StompConnection connection, final StompFrame frame) {
if (ActiveMQStompProtocolLogger.LOGGER.isTraceEnabled()) {
ActiveMQStompProtocolLogger.LOGGER.trace("sent " + frame);
}
invokeInterceptors(this.outgoingInterceptors, frame, connection);
connection.logFrame(frame, false);
synchronized (connection) {
if (connection.isDestroyed()) {

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
@ -255,14 +256,12 @@ public class StompSession implements SessionCallback {
SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
boolean pubSub = false;
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (topic) {
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (multicast) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
@ -276,8 +275,8 @@ public class StompSession implements SessionCallback {
session.createQueue(address, queueName, selectorSimple, true, false);
}
}
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast);
subscriptions.put(consumerID, subscription);
session.start();
return () -> consumer.receiveCredits(receiveCredits);
@ -295,14 +294,15 @@ public class StompSession implements SessionCallback {
iterator.remove();
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
Queue queue = manager.getServer().locateQueue(queueName);
if (sub.isMulticast() && queue != null && (durableSubscriptionName == null && !queue.isDurable())) {
session.deleteQueue(queueName);
}
result = true;
}
}
if (!result && durableSubscriptionName != null && clientID != null) {
if (durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);

View File

@ -29,18 +29,18 @@ public class StompSubscription {
private final SimpleString queueName;
// whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
private final boolean pubSub;
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.pubSub = pubSub;
this.multicast = multicast;
}
// Public --------------------------------------------------------
@ -57,13 +57,13 @@ public class StompSubscription {
return queueName;
}
public boolean isPubSub() {
return pubSub;
public boolean isMulticast() {
return multicast;
}
@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]";
}
}

View File

@ -38,6 +38,14 @@ In Apache ActiveMQ Artemis, these destinations are mapped to *addresses* and
*queues* depending on the operation being done and the desired semantics (e.g.
anycast or multicast).
## Logging
Incoming and outgoing STOMP frames can be logged by enabling `DEBUG` for
`org.apache.activemq.artemis.core.protocol.stomp.StompConnection`. This can be
extremely useful for debugging or simply monitoring client activity. Along with
the STOMP frame itself the remote IP address of the client is logged as well as
the internal connection ID so that frames from the same client can be correlated.
## Sending
When a STOMP client sends a message (using a `SEND` frame), the protocol

View File

@ -1343,6 +1343,40 @@ public class StompTest extends StompTestBase {
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testDurableUnSubscribeWithoutDurableSubName() throws Exception {
server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/');
server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST));
conn.connect(defUser, defPass, "myclientid");
String subId = UUID.randomUUID().toString();
String durableSubName = UUID.randomUUID().toString();
String receipt = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
receipt = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
conn.disconnect();
// make sure the durable subscription queue is still there
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
}
@Test
public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");