diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 39b6d09b95..b3deac40c0 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -22,7 +22,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -116,8 +116,10 @@ public class ProtocolConverter { private final Map transactions = new ConcurrentHashMap<>(); private final StompTransport stompTransport; - private final ConcurrentMap pedingAcks = new ConcurrentHashMap<>(); - private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); + // Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id + private final ConcurrentMap pendingAcksTracker = new ConcurrentHashMap<>(); + // Read-Only view used in this class to enforce the separation of read vs update of the global index. + private final Map pendingAcks = Collections.unmodifiableMap(pendingAcksTracker); private final Object commnadIdMutex = new Object(); private int lastCommandId; @@ -131,34 +133,6 @@ public class ProtocolConverter { private float hbGracePeriodMultiplier = 1.0f; private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; - private static class AckEntry { - - private final String messageId; - private final StompSubscription subscription; - - public AckEntry(String messageId, StompSubscription subscription) { - this.messageId = messageId; - this.subscription = subscription; - } - - public MessageAck onMessageAck(TransactionId transactionId) { - return subscription.onStompMessageAck(messageId, transactionId); - } - - public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { - return subscription.onStompMessageNack(messageId, transactionId); - } - - public String getMessageId() { - return this.messageId; - } - - @SuppressWarnings("unused") - public StompSubscription getSubscription() { - return this.subscription; - } - } - public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { this.stompTransport = stompTransport; this.brokerContext = brokerContext; @@ -387,9 +361,9 @@ public class ProtocolConverter { boolean nacked = false; if (ackId != null) { - AckEntry pendingAck = this.pedingAcks.remove(ackId); + StompAckEntry pendingAck = this.pendingAcks.get(ackId); if (pendingAck != null) { - messageId = pendingAck.getMessageId(); + messageId = pendingAck.getMessageId().toString(); MessageAck ack = pendingAck.onMessageNack(activemqTx); if (ack != null) { sendToActiveMQ(ack, createResponseHandler(command)); @@ -443,9 +417,9 @@ public class ProtocolConverter { boolean acked = false; if (ackId != null) { - AckEntry pendingAck = this.pedingAcks.remove(ackId); + StompAckEntry pendingAck = this.pendingAcks.get(ackId); if (pendingAck != null) { - messageId = pendingAck.getMessageId(); + messageId = pendingAck.getMessageId().toString(); MessageAck ack = pendingAck.onMessageAck(activemqTx); if (ack != null) { sendToActiveMQ(ack, createResponseHandler(command)); @@ -526,8 +500,6 @@ public class ProtocolConverter { sub.onStompCommit(activemqTx); } - pedingAcks.clear(); - TransactionInfo tx = new TransactionInfo(); tx.setConnectionId(connectionId); tx.setTransactionId(activemqTx); @@ -557,8 +529,6 @@ public class ProtocolConverter { } } - pedingAcks.clear(); - TransactionInfo tx = new TransactionInfo(); tx.setConnectionId(connectionId); tx.setTransactionId(activemqTx); @@ -624,9 +594,9 @@ public class ProtocolConverter { StompSubscription stompSubscription; if (!consumerInfo.isBrowser()) { - stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); + stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker); } else { - stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); + stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker); } stompSubscription.setDestination(actualDest); @@ -845,6 +815,7 @@ public class ProtocolConverter { protected void onStompDisconnect(StompFrame command) throws ProtocolException { if (connected.get()) { + LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size()); sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); connected.set(false); @@ -880,19 +851,7 @@ public class ProtocolConverter { MessageDispatch md = (MessageDispatch)command; StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); if (sub != null) { - String ackId = null; - if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) { - AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); - ackId = this.ACK_ID_GENERATOR.generateId(); - this.pedingAcks.put(ackId, pendingAck); - } - try { - sub.onMessageDispatch(md, ackId); - } catch (Exception ex) { - if (ackId != null) { - this.pedingAcks.remove(ackId); - } - } + sub.onMessageDispatch(md); } } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { stompTransport.sendToStomp(ping); @@ -1052,26 +1011,15 @@ public class ProtocolConverter { return result; } - /** - * Remove all pending acknowledgement markers that are batched into the single - * client acknowledge operation. - * - * @param subscription - * The STOMP Subscription that has performed a client acknowledge. - * @param msgIdsToRemove - * List of message IDs that are bound to the subscription that has ack'd - */ - protected void afterClientAck(StompSubscription subscription, ArrayList msgIdsToRemove) { - int count = 0; + boolean isStomp10() { + return version.equals(Stomp.V1_0); + } - for (Map.Entry entry : this.pedingAcks.entrySet()){ - AckEntry actEntry = entry.getValue(); - if (msgIdsToRemove.contains(actEntry.messageId)) { - this.pedingAcks.remove(entry.getKey()); - count++; - } - } + boolean isStomp11() { + return version.equals(Stomp.V1_1); + } - LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count); + boolean isStomp12() { + return version.equals(Stomp.V1_2); } } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java new file mode 100644 index 0000000000..1edcb62d2a --- /dev/null +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.TransactionId; + +/** + * Tracker object for Messages that carry STOMP v1.2 ACK IDs + */ +public class StompAckEntry { + + private final String ackId; + private final MessageId messageId; + private final StompSubscription subscription; + private final MessageDispatch dispatch; + + public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) { + this.messageId = dispatch.getMessage().getMessageId(); + this.subscription = subscription; + this.ackId = ackId; + this.dispatch = dispatch; + } + + public MessageAck onMessageAck(TransactionId transactionId) { + return subscription.onStompMessageAck(messageId.toString(), transactionId); + } + + public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { + return subscription.onStompMessageNack(messageId.toString(), transactionId); + } + + public MessageId getMessageId() { + return this.messageId; + } + + public MessageDispatch getMessageDispatch() { + return this.dispatch; + } + + public String getAckId() { + return this.ackId; + } + + public StompSubscription getSubscription() { + return this.subscription; + } + + @Override + public String toString() { + return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((messageId == null) ? 0 : messageId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + StompAckEntry other = (StompAckEntry) obj; + if (messageId == null) { + if (other.messageId != null) { + return false; + } + } else if (!messageId.equals(other.messageId)) { + return false; + } + + return true; + } +} diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java index 9e267acb2f..12385721c4 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.stomp; import java.io.IOException; +import java.util.Map; import javax.jms.JMSException; @@ -27,15 +28,14 @@ import org.apache.activemq.command.TransactionId; public class StompQueueBrowserSubscription extends StompSubscription { - public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { - super(stompTransport, subscriptionId, consumerInfo, transformation); + public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map pendingAcks) { + super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks); } @Override - void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { - + void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { if (md.getMessage() != null) { - super.onMessageDispatch(md, ackId); + super.onMessageDispatch(md); } else { StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE); browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId()); @@ -52,5 +52,4 @@ public class StompQueueBrowserSubscription extends StompSubscription { public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException { throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription."); } - } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 0d8e308aaf..95fe986b25 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -17,12 +17,10 @@ package org.apache.activemq.transport.stomp; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; -import java.util.Map.Entry; import javax.jms.JMSException; @@ -34,6 +32,9 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; +import org.apache.activemq.util.IdGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Keeps track of the STOMP subscription so that acking is correctly done. @@ -42,6 +43,10 @@ import org.apache.activemq.command.TransactionId; */ public class StompSubscription { + private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class); + + private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); + public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL; @@ -50,27 +55,37 @@ public class StompSubscription { protected final String subscriptionId; protected final ConsumerInfo consumerInfo; - protected final LinkedHashMap dispatchedMessage = new LinkedHashMap<>(); - protected final LinkedList unconsumedMessage = new LinkedList<>(); + protected final Map dispatchedMessage = new LinkedHashMap<>(); + protected final Map pendingAcks; // STOMP v1.2 requires ACK ID tracking + protected final LinkedList transactedMessages = new LinkedList<>(); protected String ackMode = AUTO_ACK; protected ActiveMQDestination destination; protected String transformation; - public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { + public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map pendingAcks) { this.protocolConverter = stompTransport; this.subscriptionId = subscriptionId; this.consumerInfo = consumerInfo; this.transformation = transformation; + this.pendingAcks = pendingAcks; } - void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { + void onMessageDispatch(MessageDispatch md) throws IOException, JMSException { ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); - if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) { + + String ackId = null; + if (isClientAck() || isIndividualAck()) { + ackId = ACK_ID_GENERATOR.generateId(); + StompAckEntry pendingAck = new StompAckEntry(md, ackId, this); + synchronized (this) { - dispatchedMessage.put(message.getMessageId(), md); + dispatchedMessage.put(message.getMessageId(), pendingAck); } - } else if (ackMode.equals(AUTO_ACK)) { + if (protocolConverter.isStomp12()) { + this.pendingAcks.put(ackId, pendingAck); + } + } else if (isAutoAck()) { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); protocolConverter.getStompTransport().sendToActiveMQ(ack); } @@ -93,35 +108,48 @@ public class StompSubscription { command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); } - if (ackId != null) { + if (protocolConverter.isStomp12() && ackId != null) { command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId); } - protocolConverter.getStompTransport().sendToStomp(command); + try { + protocolConverter.getStompTransport().sendToStomp(command); + } catch (IOException ex) { + if (ackId != null) { + pendingAcks.remove(ackId); + } + throw ex; + } } synchronized void onStompAbort(TransactionId transactionId) { - unconsumedMessage.clear(); + // Restore the pending ACKs so that their ACK IDs are again valid for a client + // to operate on. + LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size()); + for (StompAckEntry ackEntry : transactedMessages) { + if (protocolConverter.isStomp12()) { + pendingAcks.put(ackEntry.getAckId(), ackEntry); + } + } + transactedMessages.clear(); } void onStompCommit(TransactionId transactionId) { MessageAck ack = null; synchronized (this) { - for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { - @SuppressWarnings("rawtypes") - Map.Entry entry = (Entry)iter.next(); - MessageDispatch msg = (MessageDispatch)entry.getValue(); - if (unconsumedMessage.contains(msg)) { - iter.remove(); + for (Iterator iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) { + StompAckEntry ackEntry = iterator.next(); + if (transactedMessages.contains(ackEntry)) { + iterator.remove(); } } // For individual Ack we already sent an Ack that will be applied on commit // we don't send a second standard Ack as that would produce an error. - if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) { - ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + if (!transactedMessages.isEmpty() && isClientAck()) { + ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size()); ack.setTransactionId(transactionId); - unconsumedMessage.clear(); + transactedMessages.clear(); } } // avoid contention with onMessageDispatch @@ -131,10 +159,10 @@ public class StompSubscription { } synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { - MessageId msgId = new MessageId(messageId); - if (!dispatchedMessage.containsKey(msgId)) { + final StompAckEntry ackEntry = dispatchedMessage.get(msgId); + if (ackEntry == null) { return null; } @@ -142,35 +170,33 @@ public class StompSubscription { ack.setDestination(consumerInfo.getDestination()); ack.setConsumerId(consumerInfo.getConsumerId()); - final ArrayList acknowledgedMessages = new ArrayList<>(); - - if (ackMode == CLIENT_ACK) { + if (isClientAck()) { if (transactionId == null) { ack.setAckType(MessageAck.STANDARD_ACK_TYPE); } else { ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); } int count = 0; - for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { + for (Iterator iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) { + StompAckEntry entry = iterator.next(); + MessageId current = entry.getMessageId(); - @SuppressWarnings("rawtypes") - Map.Entry entry = (Entry)iter.next(); - MessageId id = (MessageId)entry.getKey(); - MessageDispatch msg = (MessageDispatch)entry.getValue(); + if (entry.getAckId() != null) { + pendingAcks.remove(entry.getAckId()); + } if (transactionId != null) { - if (!unconsumedMessage.contains(msg)) { - unconsumedMessage.add(msg); + if (!transactedMessages.contains(entry)) { + transactedMessages.add(entry); count++; } } else { - acknowledgedMessages.add(id.toString()); - iter.remove(); + iterator.remove(); count++; } - if (id.equals(msgId)) { - ack.setLastMessageId(id); + if (current.equals(msgId)) { + ack.setLastMessageId(current); break; } } @@ -178,14 +204,15 @@ public class StompSubscription { if (transactionId != null) { ack.setTransactionId(transactionId); } - - this.protocolConverter.afterClientAck(this, acknowledgedMessages); - } else if (ackMode == INDIVIDUAL_ACK) { + } else if (isIndividualAck()) { + if (ackEntry.getAckId() != null) { + pendingAcks.remove(ackEntry.getAckId()); + } ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); ack.setMessageID(msgId); ack.setMessageCount(1); if (transactionId != null) { - unconsumedMessage.add(dispatchedMessage.get(msgId)); + transactedMessages.add(dispatchedMessage.get(msgId)); ack.setTransactionId(transactionId); } else { dispatchedMessage.remove(msgId); @@ -196,23 +223,29 @@ public class StompSubscription { } public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException { - MessageId msgId = new MessageId(messageId); if (!dispatchedMessage.containsKey(msgId)) { return null; } + final StompAckEntry ackEntry = dispatchedMessage.get(msgId); + + if (ackEntry.getAckId() != null) { + pendingAcks.remove(ackEntry.getAckId()); + } + MessageAck ack = new MessageAck(); ack.setDestination(consumerInfo.getDestination()); ack.setConsumerId(consumerInfo.getConsumerId()); ack.setAckType(MessageAck.POSION_ACK_TYPE); ack.setMessageID(msgId); if (transactionId != null) { - unconsumedMessage.add(dispatchedMessage.get(msgId)); + transactedMessages.add(ackEntry); ack.setTransactionId(transactionId); + } else { + dispatchedMessage.remove(msgId); } - dispatchedMessage.remove(msgId); return ack; } @@ -225,6 +258,18 @@ public class StompSubscription { this.ackMode = ackMode; } + public boolean isAutoAck() { + return ackMode.equals(AUTO_ACK); + } + + public boolean isClientAck() { + return ackMode.equals(CLIENT_ACK); + } + + public boolean isIndividualAck() { + return ackMode.equals(INDIVIDUAL_ACK); + } + public String getSubscriptionId() { return subscriptionId; } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java index b7560c7234..3944c500af 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.Socket; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -40,7 +42,6 @@ public class Stomp12Test extends StompTestSupport { @Override public void setUp() throws Exception { - super.setUp(); stompConnect(); @@ -70,7 +71,6 @@ public class Stomp12Test extends StompTestSupport { @Test(timeout = 60000) public void testTelnetStyleSends() throws Exception { - stompConnection.setVersion(Stomp.V1_2); String connect = "CONNECT\r\n" + @@ -107,7 +107,6 @@ public class Stomp12Test extends StompTestSupport { @Test(timeout = 60000) public void testClientAckWithoutAckId() throws Exception { - stompConnection.setVersion(Stomp.V1_2); String connect = "STOMP\r\n" + @@ -150,18 +149,40 @@ public class Stomp12Test extends StompTestSupport { assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); assertEquals("1", received.getBody()); - String frame = "ACK\n" + "message-id:" + - received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID); + + // Put ACK ID in wrong header + String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); received = stompConnection.receive(); assertTrue(received.getAction().equals("ERROR")); LOG.info("Broker sent: " + received); + + // Now place it in the correct location and check it still works. + frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + frame = "DISCONNECT\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); } @Test(timeout = 60000) public void testClientAck() throws Exception { - stompConnection.setVersion(Stomp.V1_2); String connect = "STOMP\r\n" + @@ -255,11 +276,106 @@ public class Stomp12Test extends StompTestSupport { frame = "ACK\n" + "id:" + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + frame = "DISCONNECT\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + @Test(timeout = 60000) + public void testClientAckMultipleMessagesWithSingleAck() throws Exception { + final int MESSAGE_COUNT = 10; + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + // Send some messages + for (int n = 0; n < MESSAGE_COUNT; n++) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL; + stompConnection.sendFrame(message); + } + + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "ack:client\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize()); + + String lastAckId = null; + + for (int n = 0; n < MESSAGE_COUNT; n++) { + StompFrame received = stompConnection.receive(); + LOG.info("Broker sent: " + received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals(String.format("%d", n), received.getBody()); + + lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID); + } + + String frame = "ACK\n" + "id:" + lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + frame = "DISCONNECT\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); } @Test(timeout = 60000) public void testClientIndividualAck() throws Exception { - stompConnection.setVersion(Stomp.V1_2); String connect = "STOMP\r\n" + @@ -345,24 +461,117 @@ public class Stomp12Test extends StompTestSupport { assertTrue(received.getAction().equals("MESSAGE")); assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); assertEquals("1", received.getBody()); + String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID); - frame = "ACK\n" + "id:" + - received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); received = stompConnection.receive(); assertTrue(received.getAction().equals("MESSAGE")); assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); assertEquals("3", received.getBody()); + String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID); - frame = "ACK\n" + "id:" + - received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + frame = "DISCONNECT\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } @Test(timeout = 60000) - public void testQueueBrowerSubscription() throws Exception { + public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception { + final int MESSAGE_COUNT = 50; + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + // Send some messages + for (int n = 0; n < MESSAGE_COUNT; n++) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL; + stompConnection.sendFrame(message); + } + + // Subscribe to the queue + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "activemq.prefetchSize:1\n" + + "ack:client-individual\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + // Receive all messages, each in their own transaction + // Ensure we don't have any errors + for (int n = 0; n < MESSAGE_COUNT; n++) { + StompFrame received = stompConnection.receive(); + LOG.info("Broker sent: " + received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals(String.format("%d", n), received.getBody()); + + // Ack & Commit the first message + String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(begin); + + String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(commit); + } + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); + } + + @Test(timeout = 60000) + public void testQueueBrowerSubscription() throws Exception { final int MSG_COUNT = 10; String connectFrame = "STOMP\n" + @@ -523,7 +732,6 @@ public class Stomp12Test extends StompTestSupport { @Test(timeout = 60000) public void testSubscribeWithNoId() throws Exception { - String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + @@ -571,7 +779,7 @@ public class Stomp12Test extends StompTestSupport { long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage(); - for(int i = 0; i < MSG_COUNT; ++i) { + for (int i = 0; i < MSG_COUNT; ++i) { String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:0\n" + "myXkProp:" + bigProp + "\n"+ @@ -593,11 +801,372 @@ public class Stomp12Test extends StompTestSupport { "id:12345\n" + "browser:true\n\n" + Stomp.NULL; stompConnection.sendFrame(subscribe); - for(int i = 0; i < MSG_COUNT; ++i) { + for (int i = 0; i < MSG_COUNT; ++i) { StompFrame message = stompConnection.receive(); assertEquals(Stomp.Responses.MESSAGE, message.getAction()); assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION)); } + } + @Test(timeout = 60000) + public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception { + doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false); + } + + @Test(timeout = 60000) + public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception { + doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true); + } + + private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception { + final int MESSAGE_COUNT = 10; + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + // Send some messages + for (int n = 0; n < MESSAGE_COUNT; n++) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL; + stompConnection.sendFrame(message); + } + + // Subscribe to the queue + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "activemq.prefetchSize:1\n" + + "ack:client-individual\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + // Start a TX that will later be aborted. + String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(begin); + + List ackIds = new ArrayList<>(MESSAGE_COUNT); + + for (int n = 0; n < MESSAGE_COUNT; n++) { + StompFrame received = stompConnection.receive(); + LOG.info("Broker sent: " + received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals(String.format("%d", n), received.getBody()); + + ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID)); + + String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + + received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(commit); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + for (String ackId : ackIds) { + if (nack) { + String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } else { + String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); + } + + @Test(timeout = 60000) + public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception { + doTestMessagesRetirementAfterTransactionAbortClientAckMode(false); + } + + @Test(timeout = 60000) + public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception { + doTestMessagesRetirementAfterTransactionAbortClientAckMode(true); + } + + private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception { + final int MESSAGE_COUNT = 10; + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + // Send some messages + for (int n = 0; n < MESSAGE_COUNT; n++) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL; + stompConnection.sendFrame(message); + } + + // Subscribe to the queue + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" + + "ack:client\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + // Start a TX that will later be aborted. + String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(begin); + + List ackIds = new ArrayList<>(MESSAGE_COUNT); + + for (int n = 0; n < MESSAGE_COUNT; n++) { + StompFrame received = stompConnection.receive(); + LOG.info("Broker sent: " + received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals(String.format("%d", n), received.getBody()); + + ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID)); + } + + // Client ACK that enlists all messages in the TX + String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(commit); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + for (String ackId : ackIds) { + if (nack) { + frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } else { + frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); + } + + @Test(timeout = 60000) + public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception { + doTestMixedAckNackWithMessageAckIds(false); + } + + @Test(timeout = 60000) + public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception { + doTestMixedAckNackWithMessageAckIds(true); + } + + public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception { + + final int MESSAGE_COUNT = 20; + + stompConnection.setVersion(Stomp.V1_2); + + String connect = "STOMP\r\n" + + "accept-version:1.2\r\n" + + "login:system\r\n" + + "passcode:manager\r\n" + + "\r\n" + + "\u0000\r\n"; + + stompConnection.sendFrame(connect); + + String f = stompConnection.receiveFrame(); + LOG.info("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + assertTrue(f.indexOf("version:1.2") >= 0); + assertTrue(f.indexOf("session:") >= 0); + + // Send some messages + for (int n = 0; n < MESSAGE_COUNT; n++) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL; + stompConnection.sendFrame(message); + } + + // Subscribe to the queue + String subscribe = "SUBSCRIBE\n" + + "id:1\n" + + "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" + + "ack:" + (individual ? "client-individual" : "client") + "\n" + + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(subscribe); + + StompFrame receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + String receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("1", receiptId); + + List ackIds = new ArrayList<>(MESSAGE_COUNT); + + for (int n = 0; n < MESSAGE_COUNT; n++) { + StompFrame received = stompConnection.receive(); + LOG.info("Broker sent: " + received); + assertTrue(received.getAction().equals("MESSAGE")); + assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID)); + assertEquals(String.format("%d", n), received.getBody()); + + ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID)); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + boolean nack = false; + + for (String ackId : ackIds) { + if (nack) { + String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + nack = !nack; + } else { + String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + nack = !nack; + } + + receipt = stompConnection.receive(); + LOG.info("Broker sent: " + receipt); + assertTrue(receipt.getAction().startsWith("RECEIPT")); + receiptId = receipt.getHeaders().get("receipt-id"); + assertEquals("2", receiptId); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToQueue(getQueueName()).getQueueSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25))); + + String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() <= 1; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)); } }