fix for https://issues.apache.org/activemq/browse/AMQ-2280 - stomp transactions and multiple destinations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@801916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-08-07 09:18:23 +00:00
parent c866f88d23
commit 1a6279e170
2 changed files with 46 additions and 15 deletions

View File

@ -123,18 +123,19 @@ public class StompSubscription {
synchronized void onStompCommit(TransactionId transactionId) {
// ack all messages
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
ack.setMessageCount(unconsumedMessage.size());
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
// clear lists
unconsumedMessage.clear();
dispatchedMessage.clear();
if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
ack.setMessageCount(unconsumedMessage.size());
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
// clear lists
unconsumedMessage.clear();
}
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
@ -169,9 +170,8 @@ public class StompSubscription {
if (transactionId != null) {
if (!unconsumedMessage.contains(msg))
unconsumedMessage.add(msg);
} else {
iter.remove();
}
iter.remove();
count++;

View File

@ -998,7 +998,38 @@ public class StompTest extends CombinationTestSupport {
stompDisconnect();
}
}
public void testTransactionsWithMultipleDestinations() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
headers.put("activemq.exclusive", "true");
stompConnection.subscribe("/queue/test1", "client", headers);
stompConnection.begin("ID:tx1");
headers.clear();
headers.put("receipt", "ID:msg1");
stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
stompConnection.commit("ID:tx1");
// make sure connection is active after commit
Thread.sleep(1000);
stompConnection.send("/queue/test1", "another message");
StompFrame frame = stompConnection.receive(500);
System.out.println(frame);
assertNotNull(frame);
stompConnection.disconnect();
}
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;