https://issues.apache.org/jira/browse/AMQ-4297 -resolve intermittent hang/fail of stomp tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-02-05 18:30:33 +00:00
parent 4650f998db
commit 37ad85e3bd
1 changed files with 17 additions and 11 deletions

View File

@ -107,20 +107,26 @@ public class StompSubscription {
unconsumedMessage.clear(); unconsumedMessage.clear();
} }
synchronized void onStompCommit(TransactionId transactionId) { void onStompCommit(TransactionId transactionId) {
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { MessageAck ack = null;
@SuppressWarnings("rawtypes") synchronized (this) {
Map.Entry entry = (Entry)iter.next(); for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
MessageDispatch msg = (MessageDispatch)entry.getValue(); @SuppressWarnings("rawtypes")
if (unconsumedMessage.contains(msg)) { Map.Entry entry = (Entry)iter.next();
iter.remove(); MessageDispatch msg = (MessageDispatch)entry.getValue();
if (unconsumedMessage.contains(msg)) {
iter.remove();
}
}
if (!unconsumedMessage.isEmpty()) {
ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
unconsumedMessage.clear();
} }
} }
// avoid contention with onMessageDispatch
if (!unconsumedMessage.isEmpty()) { if (ack != null) {
MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
protocolConverter.getStompTransport().sendToActiveMQ(ack); protocolConverter.getStompTransport().sendToActiveMQ(ack);
unconsumedMessage.clear();
} }
} }