AMQ-7218 Fix loss of Ack id from tracking after TX commit / abort

Ensure that we properly track Ack IDs across TX commit and abort
operations and only clear out values enlisted in the TX on commit and
re-acquire the Ack Ids on TX abort.
This commit is contained in:
Timothy Bish 2019-05-29 17:25:30 -04:00
parent d0fd46efcd
commit 063d24e6d6
5 changed files with 800 additions and 138 deletions

View File

@ -22,7 +22,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -116,8 +116,10 @@ public class ProtocolConverter {
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
private final StompTransport stompTransport;
private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
// Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id
private final ConcurrentMap<String, StompAckEntry> pendingAcksTracker = new ConcurrentHashMap<>();
// Read-Only view used in this class to enforce the separation of read vs update of the global index.
private final Map<String, StompAckEntry> pendingAcks = Collections.unmodifiableMap(pendingAcksTracker);
private final Object commnadIdMutex = new Object();
private int lastCommandId;
@ -131,34 +133,6 @@ public class ProtocolConverter {
private float hbGracePeriodMultiplier = 1.0f;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
private static class AckEntry {
private final String messageId;
private final StompSubscription subscription;
public AckEntry(String messageId, StompSubscription subscription) {
this.messageId = messageId;
this.subscription = subscription;
}
public MessageAck onMessageAck(TransactionId transactionId) {
return subscription.onStompMessageAck(messageId, transactionId);
}
public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
return subscription.onStompMessageNack(messageId, transactionId);
}
public String getMessageId() {
return this.messageId;
}
@SuppressWarnings("unused")
public StompSubscription getSubscription() {
return this.subscription;
}
}
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.brokerContext = brokerContext;
@ -387,9 +361,9 @@ public class ProtocolConverter {
boolean nacked = false;
if (ackId != null) {
AckEntry pendingAck = this.pedingAcks.remove(ackId);
StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
messageId = pendingAck.getMessageId();
messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageNack(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@ -443,9 +417,9 @@ public class ProtocolConverter {
boolean acked = false;
if (ackId != null) {
AckEntry pendingAck = this.pedingAcks.remove(ackId);
StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
messageId = pendingAck.getMessageId();
messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageAck(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@ -526,8 +500,6 @@ public class ProtocolConverter {
sub.onStompCommit(activemqTx);
}
pedingAcks.clear();
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@ -557,8 +529,6 @@ public class ProtocolConverter {
}
}
pedingAcks.clear();
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@ -624,9 +594,9 @@ public class ProtocolConverter {
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
} else {
stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
}
stompSubscription.setDestination(actualDest);
@ -845,6 +815,7 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws ProtocolException {
if (connected.get()) {
LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size());
sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
@ -880,19 +851,7 @@ public class ProtocolConverter {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
String ackId = null;
if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
ackId = this.ACK_ID_GENERATOR.generateId();
this.pedingAcks.put(ackId, pendingAck);
}
try {
sub.onMessageDispatch(md, ackId);
} catch (Exception ex) {
if (ackId != null) {
this.pedingAcks.remove(ackId);
}
}
sub.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
@ -1052,26 +1011,15 @@ public class ProtocolConverter {
return result;
}
/**
* Remove all pending acknowledgement markers that are batched into the single
* client acknowledge operation.
*
* @param subscription
* The STOMP Subscription that has performed a client acknowledge.
* @param msgIdsToRemove
* List of message IDs that are bound to the subscription that has ack'd
*/
protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
int count = 0;
for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
AckEntry actEntry = entry.getValue();
if (msgIdsToRemove.contains(actEntry.messageId)) {
this.pedingAcks.remove(entry.getKey());
count++;
}
boolean isStomp10() {
return version.equals(Stomp.V1_0);
}
LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
boolean isStomp11() {
return version.equals(Stomp.V1_1);
}
boolean isStomp12() {
return version.equals(Stomp.V1_2);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
/**
* Tracker object for Messages that carry STOMP v1.2 ACK IDs
*/
public class StompAckEntry {
private final String ackId;
private final MessageId messageId;
private final StompSubscription subscription;
private final MessageDispatch dispatch;
public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) {
this.messageId = dispatch.getMessage().getMessageId();
this.subscription = subscription;
this.ackId = ackId;
this.dispatch = dispatch;
}
public MessageAck onMessageAck(TransactionId transactionId) {
return subscription.onStompMessageAck(messageId.toString(), transactionId);
}
public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
return subscription.onStompMessageNack(messageId.toString(), transactionId);
}
public MessageId getMessageId() {
return this.messageId;
}
public MessageDispatch getMessageDispatch() {
return this.dispatch;
}
public String getAckId() {
return this.ackId;
}
public StompSubscription getSubscription() {
return this.subscription;
}
@Override
public String toString() {
return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
StompAckEntry other = (StompAckEntry) obj;
if (messageId == null) {
if (other.messageId != null) {
return false;
}
} else if (!messageId.equals(other.messageId)) {
return false;
}
return true;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.util.Map;
import javax.jms.JMSException;
@ -27,15 +28,14 @@ import org.apache.activemq.command.TransactionId;
public class StompQueueBrowserSubscription extends StompSubscription {
public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
super(stompTransport, subscriptionId, consumerInfo, transformation);
public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks);
}
@Override
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
if (md.getMessage() != null) {
super.onMessageDispatch(md, ackId);
super.onMessageDispatch(md);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
@ -52,5 +52,4 @@ public class StompQueueBrowserSubscription extends StompSubscription {
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
}
}

View File

@ -17,12 +17,10 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.JMSException;
@ -34,6 +32,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
@ -42,6 +43,10 @@ import org.apache.activemq.command.TransactionId;
*/
public class StompSubscription {
private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class);
private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
@ -50,27 +55,37 @@ public class StompSubscription {
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
protected final Map<MessageId, StompAckEntry> dispatchedMessage = new LinkedHashMap<>();
protected final Map<String, StompAckEntry> pendingAcks; // STOMP v1.2 requires ACK ID tracking
protected final LinkedList<StompAckEntry> transactedMessages = new LinkedList<>();
protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination;
protected String transformation;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
this.protocolConverter = stompTransport;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo;
this.transformation = transformation;
this.pendingAcks = pendingAcks;
}
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
String ackId = null;
if (isClientAck() || isIndividualAck()) {
ackId = ACK_ID_GENERATOR.generateId();
StompAckEntry pendingAck = new StompAckEntry(md, ackId, this);
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
dispatchedMessage.put(message.getMessageId(), pendingAck);
}
} else if (ackMode.equals(AUTO_ACK)) {
if (protocolConverter.isStomp12()) {
this.pendingAcks.put(ackId, pendingAck);
}
} else if (isAutoAck()) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
@ -93,35 +108,48 @@ public class StompSubscription {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
if (ackId != null) {
if (protocolConverter.isStomp12() && ackId != null) {
command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
}
try {
protocolConverter.getStompTransport().sendToStomp(command);
} catch (IOException ex) {
if (ackId != null) {
pendingAcks.remove(ackId);
}
throw ex;
}
}
synchronized void onStompAbort(TransactionId transactionId) {
unconsumedMessage.clear();
// Restore the pending ACKs so that their ACK IDs are again valid for a client
// to operate on.
LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size());
for (StompAckEntry ackEntry : transactedMessages) {
if (protocolConverter.isStomp12()) {
pendingAcks.put(ackEntry.getAckId(), ackEntry);
}
}
transactedMessages.clear();
}
void onStompCommit(TransactionId transactionId) {
MessageAck ack = null;
synchronized (this) {
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
@SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (unconsumedMessage.contains(msg)) {
iter.remove();
for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
StompAckEntry ackEntry = iterator.next();
if (transactedMessages.contains(ackEntry)) {
iterator.remove();
}
}
// For individual Ack we already sent an Ack that will be applied on commit
// we don't send a second standard Ack as that would produce an error.
if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
if (!transactedMessages.isEmpty() && isClientAck()) {
ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size());
ack.setTransactionId(transactionId);
unconsumedMessage.clear();
transactedMessages.clear();
}
}
// avoid contention with onMessageDispatch
@ -131,10 +159,10 @@ public class StompSubscription {
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
if (ackEntry == null) {
return null;
}
@ -142,35 +170,33 @@ public class StompSubscription {
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
final ArrayList<String> acknowledgedMessages = new ArrayList<>();
if (ackMode == CLIENT_ACK) {
if (isClientAck()) {
if (transactionId == null) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
} else {
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
}
int count = 0;
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
StompAckEntry entry = iterator.next();
MessageId current = entry.getMessageId();
@SuppressWarnings("rawtypes")
Map.Entry entry = (Entry)iter.next();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (entry.getAckId() != null) {
pendingAcks.remove(entry.getAckId());
}
if (transactionId != null) {
if (!unconsumedMessage.contains(msg)) {
unconsumedMessage.add(msg);
if (!transactedMessages.contains(entry)) {
transactedMessages.add(entry);
count++;
}
} else {
acknowledgedMessages.add(id.toString());
iter.remove();
iterator.remove();
count++;
}
if (id.equals(msgId)) {
ack.setLastMessageId(id);
if (current.equals(msgId)) {
ack.setLastMessageId(current);
break;
}
}
@ -178,14 +204,15 @@ public class StompSubscription {
if (transactionId != null) {
ack.setTransactionId(transactionId);
}
this.protocolConverter.afterClientAck(this, acknowledgedMessages);
} else if (ackMode == INDIVIDUAL_ACK) {
} else if (isIndividualAck()) {
if (ackEntry.getAckId() != null) {
pendingAcks.remove(ackEntry.getAckId());
}
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
ack.setMessageCount(1);
if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId));
transactedMessages.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
} else {
dispatchedMessage.remove(msgId);
@ -196,23 +223,29 @@ public class StompSubscription {
}
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
if (ackEntry.getAckId() != null) {
pendingAcks.remove(ackEntry.getAckId());
}
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.POSION_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId));
transactedMessages.add(ackEntry);
ack.setTransactionId(transactionId);
}
} else {
dispatchedMessage.remove(msgId);
}
return ack;
}
@ -225,6 +258,18 @@ public class StompSubscription {
this.ackMode = ackMode;
}
public boolean isAutoAck() {
return ackMode.equals(AUTO_ACK);
}
public boolean isClientAck() {
return ackMode.equals(CLIENT_ACK);
}
public boolean isIndividualAck() {
return ackMode.equals(INDIVIDUAL_ACK);
}
public String getSubscriptionId() {
return subscriptionId;
}

View File

@ -21,7 +21,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@ -40,7 +42,6 @@ public class Stomp12Test extends StompTestSupport {
@Override
public void setUp() throws Exception {
super.setUp();
stompConnect();
@ -70,7 +71,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testTelnetStyleSends() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "CONNECT\r\n" +
@ -107,7 +107,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testClientAckWithoutAckId() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@ -150,18 +149,40 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
String frame = "ACK\n" + "message-id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
// Put ACK ID in wrong header
String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);
// Now place it in the correct location and check it still works.
frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testClientAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@ -255,11 +276,106 @@ public class Stomp12Test extends StompTestSupport {
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testClientAckMultipleMessagesWithSingleAck() throws Exception {
final int MESSAGE_COUNT = 10;
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
// Send some messages
for (int n = 0; n < MESSAGE_COUNT; n++) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
stompConnection.sendFrame(message);
}
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"ack:client\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize());
String lastAckId = null;
for (int n = 0; n < MESSAGE_COUNT; n++) {
StompFrame received = stompConnection.receive();
LOG.info("Broker sent: " + received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals(String.format("%d", n), received.getBody());
lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
}
String frame = "ACK\n" + "id:" + lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
}
@Test(timeout = 60000)
public void testClientIndividualAck() throws Exception {
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@ -345,24 +461,117 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("3", received.getBody());
String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testQueueBrowerSubscription() throws Exception {
public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception {
final int MESSAGE_COUNT = 50;
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
// Send some messages
for (int n = 0; n < MESSAGE_COUNT; n++) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
stompConnection.sendFrame(message);
}
// Subscribe to the queue
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"activemq.prefetchSize:1\n" +
"ack:client-individual\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
// Receive all messages, each in their own transaction
// Ensure we don't have any errors
for (int n = 0; n < MESSAGE_COUNT; n++) {
StompFrame received = stompConnection.receive();
LOG.info("Broker sent: " + received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals(String.format("%d", n), received.getBody());
// Ack & Commit the first message
String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(begin);
String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(commit);
}
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
@Test(timeout = 60000)
public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
String connectFrame = "STOMP\n" +
@ -523,7 +732,6 @@ public class Stomp12Test extends StompTestSupport {
@Test(timeout = 60000)
public void testSubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
@ -571,7 +779,7 @@ public class Stomp12Test extends StompTestSupport {
long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
for(int i = 0; i < MSG_COUNT; ++i) {
for (int i = 0; i < MSG_COUNT; ++i) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:0\n" +
"myXkProp:" + bigProp + "\n"+
@ -593,11 +801,372 @@ public class Stomp12Test extends StompTestSupport {
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
for(int i = 0; i < MSG_COUNT; ++i) {
for (int i = 0; i < MSG_COUNT; ++i) {
StompFrame message = stompConnection.receive();
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
}
}
@Test(timeout = 60000)
public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false);
}
@Test(timeout = 60000)
public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true);
}
private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception {
final int MESSAGE_COUNT = 10;
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
// Send some messages
for (int n = 0; n < MESSAGE_COUNT; n++) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
stompConnection.sendFrame(message);
}
// Subscribe to the queue
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"activemq.prefetchSize:1\n" +
"ack:client-individual\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
// Start a TX that will later be aborted.
String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(begin);
List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
for (int n = 0; n < MESSAGE_COUNT; n++) {
StompFrame received = stompConnection.receive();
LOG.info("Broker sent: " + received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals(String.format("%d", n), received.getBody());
ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(commit);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
for (String ackId : ackIds) {
if (nack) {
String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
} else {
String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
@Test(timeout = 60000)
public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception {
doTestMessagesRetirementAfterTransactionAbortClientAckMode(false);
}
@Test(timeout = 60000)
public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception {
doTestMessagesRetirementAfterTransactionAbortClientAckMode(true);
}
private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception {
final int MESSAGE_COUNT = 10;
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
// Send some messages
for (int n = 0; n < MESSAGE_COUNT; n++) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
stompConnection.sendFrame(message);
}
// Subscribe to the queue
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
"ack:client\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
// Start a TX that will later be aborted.
String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(begin);
List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
for (int n = 0; n < MESSAGE_COUNT; n++) {
StompFrame received = stompConnection.receive();
LOG.info("Broker sent: " + received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals(String.format("%d", n), received.getBody());
ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
}
// Client ACK that enlists all messages in the TX
String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(commit);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
for (String ackId : ackIds) {
if (nack) {
frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
} else {
frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
@Test(timeout = 60000)
public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception {
doTestMixedAckNackWithMessageAckIds(false);
}
@Test(timeout = 60000)
public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception {
doTestMixedAckNackWithMessageAckIds(true);
}
public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
final int MESSAGE_COUNT = 20;
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
"accept-version:1.2\r\n" +
"login:system\r\n" +
"passcode:manager\r\n" +
"\r\n" +
"\u0000\r\n";
stompConnection.sendFrame(connect);
String f = stompConnection.receiveFrame();
LOG.info("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("version:1.2") >= 0);
assertTrue(f.indexOf("session:") >= 0);
// Send some messages
for (int n = 0; n < MESSAGE_COUNT; n++) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
stompConnection.sendFrame(message);
}
// Subscribe to the queue
String subscribe = "SUBSCRIBE\n" +
"id:1\n" +
"activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
"ack:" + (individual ? "client-individual" : "client") + "\n" +
"destination:/queue/" + getQueueName() + "\n" +
"receipt:1\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
StompFrame receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
String receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("1", receiptId);
List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
for (int n = 0; n < MESSAGE_COUNT; n++) {
StompFrame received = stompConnection.receive();
LOG.info("Broker sent: " + received);
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals(String.format("%d", n), received.getBody());
ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
boolean nack = false;
for (String ackId : ackIds) {
if (nack) {
String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
nack = !nack;
} else {
String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
nack = !nack;
}
receipt = stompConnection.receive();
LOG.info("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
receiptId = receipt.getHeaders().get("receipt-id");
assertEquals("2", receiptId);
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToQueue(getQueueName()).getQueueSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
}