From 37ad85e3bdd9c8f27a3197342fb06008053638c9 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 5 Feb 2013 18:30:33 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4297 -resolve intermittent hang/fail of stomp tests git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442688 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/StompSubscription.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 714bce914b..d4492e1bf6 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -107,20 +107,26 @@ public class StompSubscription { unconsumedMessage.clear(); } - synchronized void onStompCommit(TransactionId transactionId) { - for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { - @SuppressWarnings("rawtypes") - Map.Entry entry = (Entry)iter.next(); - MessageDispatch msg = (MessageDispatch)entry.getValue(); - if (unconsumedMessage.contains(msg)) { - iter.remove(); + void onStompCommit(TransactionId transactionId) { + MessageAck ack = null; + synchronized (this) { + for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { + @SuppressWarnings("rawtypes") + Map.Entry entry = (Entry)iter.next(); + MessageDispatch msg = (MessageDispatch)entry.getValue(); + if (unconsumedMessage.contains(msg)) { + iter.remove(); + } + } + + if (!unconsumedMessage.isEmpty()) { + ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + unconsumedMessage.clear(); } } - - if (!unconsumedMessage.isEmpty()) { - MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + // avoid contention with onMessageDispatch + if (ack != null) { protocolConverter.getStompTransport().sendToActiveMQ(ack); - unconsumedMessage.clear(); } }