From b0f24f34d0311e54e22bd6492010a102197c3a7e Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Mon, 17 Aug 2009 11:43:34 +0000 Subject: [PATCH] additional fix for https://issues.apache.org/activemq/browse/AMQ-1807 - fixing test case and making it work git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@804943 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 1 - .../transport/stomp/ProtocolConverter.java | 6 +- .../transport/stomp/StompConnection.java | 9 --- .../transport/stomp/StompSubscription.java | 55 +++++-------------- .../activemq/transport/stomp/StompTest.java | 18 +++--- 5 files changed, 25 insertions(+), 64 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index d4c26fed69..e51a8d7505 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -236,7 +236,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { dequeueCounter++; dispatched.remove(node); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); - prefetchExtension--; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index ffc398af72..dc3c34427f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -361,11 +361,7 @@ public class ProtocolConverter { } for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription sub = iter.next(); - try { - sub.onStompAbort(activemqTx); - } catch (Exception e) { - throw new ProtocolException("Transaction abort failed", false, e); - } + sub.onStompAbort(activemqTx); } TransactionInfo tx = new TransactionInfo(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java index cddd51a19f..db68061eaa 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java @@ -188,15 +188,6 @@ public class StompConnection { } public void abort(String transaction) throws Exception { - // discard all content on the wire before - // aborting the transaction - try { - StompFrame discarded = this.receive(100); - while (discarded != null) { - discarded = this.receive(100); - } - } catch (Exception e) { - } HashMap headers = new HashMap(); headers.put("transaction", transaction); StompFrame frame = new StompFrame("ABORT", headers); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 892e417844..b158b5be61 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -99,43 +99,20 @@ public class StompSubscription { protocolConverter.getTransportFilter().sendToStomp(command); } - synchronized void onStompAbort(TransactionId transactionId) throws IOException, JMSException { - //ack all unacked messages - for (MessageDispatch md : dispatchedMessage.values()) { - if (!unconsumedMessage.contains(md)) { - MessageAck ack = new MessageAck(); - ack.setDestination(consumerInfo.getDestination()); - ack.setConsumerId(consumerInfo.getConsumerId()); - ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); - ack.setFirstMessageId(md.getMessage().getMessageId()); - ack.setLastMessageId(md.getMessage().getMessageId()); - ack.setMessageCount(1); - ack.setTransactionId(transactionId); - protocolConverter.getTransportFilter().sendToActiveMQ(ack); - unconsumedMessage.add(md); - } - } - // redeliver all unconsumed messages - for (MessageDispatch md : unconsumedMessage) { - onMessageDispatch(md); - } + synchronized void onStompAbort(TransactionId transactionId) { + unconsumedMessage.clear(); } synchronized void onStompCommit(TransactionId transactionId) { - // ack all messages - if (!unconsumedMessage.isEmpty()) { - MessageAck ack = new MessageAck(); - ack.setDestination(consumerInfo.getDestination()); - ack.setConsumerId(consumerInfo.getConsumerId()); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId()); - ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId()); - ack.setMessageCount(unconsumedMessage.size()); - ack.setTransactionId(transactionId); - protocolConverter.getTransportFilter().sendToActiveMQ(ack); - // clear lists - unconsumedMessage.clear(); + for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Entry)iter.next(); + MessageId id = (MessageId)entry.getKey(); + MessageDispatch msg = (MessageDispatch)entry.getValue(); + if (unconsumedMessage.contains(msg)) { + iter.remove(); + } } + unconsumedMessage.clear(); } synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { @@ -151,11 +128,7 @@ public class StompSubscription { ack.setConsumerId(consumerInfo.getConsumerId()); if (ackMode == CLIENT_ACK) { - if (transactionId != null) { - ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); - } else { - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); - } + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); int count = 0; for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { @@ -168,10 +141,12 @@ public class StompSubscription { } if (transactionId != null) { - if (!unconsumedMessage.contains(msg)) + if (!unconsumedMessage.contains(msg)) { unconsumedMessage.add(msg); + } + } else { + iter.remove(); } - iter.remove(); count++; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 17bdade714..8dc5e41d8b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -977,24 +977,24 @@ public class StompTest extends CombinationTestSupport { stompConnection.begin("tx2"); + // Previously delivered message need to get re-acked... + stompConnection.ack(frame, "tx2"); + stompConnection.ack(frame1, "tx2"); + StompFrame frame3 = stompConnection.receive(); - assertEquals(frame3.getBody(), "message 1"); + assertEquals(frame3.getBody(), "message 3"); stompConnection.ack(frame3, "tx2"); StompFrame frame4 = stompConnection.receive(); - assertEquals(frame4.getBody(), "message 2"); + assertEquals(frame4.getBody(), "message 4"); stompConnection.ack(frame4, "tx2"); - StompFrame frame5 = stompConnection.receive(); - assertEquals(frame5.getBody(), "message 3"); - stompConnection.ack(frame5, "tx2"); - stompConnection.commit("tx2"); stompConnection.begin("tx3"); - StompFrame frame6 = stompConnection.receive(); - assertEquals(frame6.getBody(), "message 4"); - stompConnection.ack(frame6, "tx3"); + StompFrame frame5 = stompConnection.receive(); + assertEquals(frame5.getBody(), "message 5"); + stompConnection.ack(frame5, "tx3"); stompConnection.commit("tx3"); stompDisconnect();