https://issues.apache.org/jira/browse/AMQ-4107 - apply patch - resolved intermittent failure of TwoMulticastDiscoveryBrokerTopicSendReceiveTest

This commit is contained in:
gtully 2015-01-12 11:59:51 +00:00
parent 3e007d89a2
commit 8dbb48a23f
2 changed files with 20 additions and 23 deletions

View File

@ -106,26 +106,26 @@ public class TopicSubscription extends AbstractSubscription {
// locator /w the message. // locator /w the message.
node = new IndirectMessageReference(node.getMessage()); node = new IndirectMessageReference(node.getMessage());
enqueueCounter.incrementAndGet(); enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty()) { synchronized (matchedListMutex) {
// if maximumPendingMessages is set we will only discard messages which if (!isFull() && matched.isEmpty()) {
// have not been dispatched (i.e. we allow the prefetch buffer to be filled) // if maximumPendingMessages is set we will only discard messages which
dispatch(node); // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
setSlowConsumer(false); dispatch(node);
} else { setSlowConsumer(false);
if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { } else {
// Slow consumers should log and set their state as such. if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
if (!isSlowConsumer()) { // Slow consumers should log and set their state as such.
LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); if (!isSlowConsumer()) {
setSlowConsumer(true); LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
for (Destination dest: destinations) { setSlowConsumer(true);
dest.slowConsumer(getContext(), this); for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
}
} }
} }
} if (maximumPendingMessages != 0) {
if (maximumPendingMessages != 0) { boolean warnedAboutWait = false;
boolean warnedAboutWait = false; while (active) {
while (active) {
synchronized (matchedListMutex) {
while (matched.isFull()) { while (matched.isFull()) {
if (getContext().getStopping().get()) { if (getContext().getStopping().get()) {
LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
@ -150,9 +150,6 @@ public class TopicSubscription extends AbstractSubscription {
break; break;
} }
} }
}
synchronized (matchedListMutex) {
// NOTE - be careful about the slaveBroker!
if (maximumPendingMessages > 0) { if (maximumPendingMessages > 0) {
// calculate the high water mark from which point we // calculate the high water mark from which point we
// will eagerly evict expired messages // will eagerly evict expired messages
@ -195,8 +192,8 @@ public class TopicSubscription extends AbstractSubscription {
} }
} }
} }
dispatchMatched();
} }
dispatchMatched();
} }
} }
} }

View File

@ -21,7 +21,7 @@ import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
/** /**
* * reproduced: https://issues.apache.org/jira/browse/AMQ-4107
*/ */
public class TwoMulticastDiscoveryBrokerTopicSendReceiveTest extends TwoBrokerTopicSendReceiveTest { public class TwoMulticastDiscoveryBrokerTopicSendReceiveTest extends TwoBrokerTopicSendReceiveTest {