https://issues.apache.org/jira/browse/AMQ-3805 - further resolution, dispatch and activate could clash over the audit resulting in skipped messages in the batch, the end result being our of order dispatch. Additional test and fix up of use of cache and sync between durable sub and prefetch sub

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1328413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-04-20 15:36:10 +00:00
parent 74bed6b577
commit 6b4d077bdf
5 changed files with 172 additions and 45 deletions

View File

@ -167,40 +167,37 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
active.set(false);
offlineTimestamp.set(System.currentTimeMillis());
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
synchronized (pendingLock) {
pending.stop();
}
for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
if (!keepDurableSubsActive) {
topic.deactivate(context, this);
} else {
topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
}
}
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
} else {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
}
if (keepDurableSubsActive && pending.isTransient()) {
synchronized (pending) {
pending.addMessageFirst(node);
pending.rollback(node.getMessageId());
synchronized (dispatchLock) {
for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
if (!keepDurableSubsActive) {
topic.deactivate(context, this);
} else {
topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
}
}
} else {
node.decrementReferenceCount();
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
} else {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
}
if (keepDurableSubsActive && pending.isTransient()) {
pending.addMessageFirst(node);
pending.rollback(node.getMessageId());
} else {
node.decrementReferenceCount();
}
}
dispatched.clear();
}
}
synchronized(dispatched) {
dispatched.clear();
}
if (!keepDurableSubsActive && pending.isTransient()) {
synchronized (pending) {
if (!keepDurableSubsActive && pending.isTransient()) {
try {
pending.reset();
while (pending.hasNext()) {
@ -286,7 +283,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
* Release any references that we are holding.
*/
public void destroy() {
synchronized (pending) {
synchronized (pendingLock) {
try {
pending.reset();
@ -300,7 +297,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.clear();
}
}
synchronized(dispatched) {
synchronized (dispatchLock) {
for (MessageReference node : dispatched) {
node.decrementReferenceCount();
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -57,7 +56,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected final Scheduler scheduler;
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
protected final AtomicInteger prefetchExtension = new AtomicInteger();
protected boolean usePrefetchExtension = true;
protected long enqueueCounter;
@ -67,7 +66,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
protected final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
protected final Object dispatchLock = new Object();
private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
@ -445,7 +444,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* Checks an ack versus the contents of the dispatched list.
*
* called with dispatchLock held
* @param ack
* @throws JMSException if it does not match
*/
@ -658,6 +657,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.setMaxBatchSize(numberToDispatch);
}
// called with dispatchLock held
protected boolean dispatch(final MessageReference node) throws IOException {
final Message message = node.getMessage();
if (message == null) {

View File

@ -172,6 +172,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void addMessageLast(MessageReference node) throws Exception {
boolean disableCache = false;
if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
if (LOG.isTraceEnabled()) {
@ -180,10 +181,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
setCacheEnabled(true);
}
if (isCacheEnabled()) {
recoverMessage(node.getMessage(),true);
lastCachedId = node.getMessageId();
if (recoverMessage(node.getMessage(),true)) {
lastCachedId = node.getMessageId();
} else {
// failed to recover, possible duplicate from concurrent dispatchPending,
// lets not recover further in case of out of order
disableCache = true;
}
}
} else if (isCacheEnabled()) {
} else {
disableCache = true;
}
if (disableCache && isCacheEnabled()) {
setCacheEnabled(false);
// sync with store on disabling the cache
if (lastCachedId != null) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.io.File;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -91,7 +92,8 @@ public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
static final Vector<Throwable> exceptions = new Vector<Throwable>();
// long form of test that found https://issues.apache.org/jira/browse/AMQ-3805
@Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate")
@Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate"
+ " and org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testOrderOnActivateDeactivate")
@Test
public void testProcess() {
try {
@ -125,12 +127,16 @@ public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
//allow the clients to unsubscribe before finishing
clientManager.setEnd(true);
try {
Thread.sleep(600000);
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
exit("ProcessTest.testProcess failed.", e);
}
server.done = true;
try {
server.join(60*1000);
} catch (Exception ignored) {}
processLock.writeLock().lock();
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
LOG.info("DONE.");
@ -177,7 +183,7 @@ public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
int transRover = 0;
int messageRover = 0;
public volatile int committingTransaction = -1;
public boolean done = false;
public Server() {
super("Server");
setPriority(Thread.MIN_PRIORITY);
@ -187,7 +193,7 @@ public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
@Override
public void run() {
try {
while (true) {
while (!done) {
Thread.sleep(1000);

View File

@ -102,6 +102,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker.getManagementContext().setCreateConnector(false);
broker.setAdvisorySupport(false);
broker.setKeepDurableSubsActive(keepDurableSubsActive);
broker.addConnector("tcp://0.0.0.0:0");
if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry();
@ -1056,6 +1057,119 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
public void testOrderOnActivateDeactivate() throws Exception {
for (int i=0;i<10;i++) {
LOG.info("Iteration: " + i);
doTestOrderOnActivateDeactivate();
broker.stop();
createBroker(true /*deleteAllMessages*/);
}
}
public void doTestOrderOnActivateDeactivate() throws Exception {
final int messageCount = 1000;
Connection con = null;
Session session = null;
final int numConsumers = 4;
for (int i = 0; i <= numConsumers; i++) {
con = createConnection("cli" + i);
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
}
final String url = "failover:(tcp://localhost:"
+ (broker.getTransportConnectors().get(1).getConnectUri()).getPort()
+ "?wireFormat.maxInactivityDuration=0)?"
+ "jms.watchTopicAdvisories=false&"
+ "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+ "jms.sendAcksAsync=true&"
+ "initialReconnectDelay=100&maxReconnectDelay=30000&"
+ "useExponentialBackOff=true";
final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url);
class CheckOrderClient implements Runnable {
final int id;
int runCount = 0;
public CheckOrderClient(int id) {
this.id = id;
}
@Override
public void run() {
try {
synchronized (this) {
Connection con = clientFactory.createConnection();
con.setClientID("cli" + id);
con.start();
Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
int nextId = 0;
++runCount;
int i=0;
for (; i < messageCount/2; i++) {
Message message = consumer.receiveNoWait();
if (message == null) {
break;
}
long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
assertEquals(id + " expected order: runCount: " + runCount + " id: " + message.getJMSMessageID(), ++nextId, producerSequenceId);
}
LOG.info(con.getClientID() + " peeked " + i);
session.close();
con.close();
}
} catch (Throwable e) {
e.printStackTrace();
exceptions.add(e);
}
}
}
Runnable producer = new Runnable() {
final String payLoad = new String(new byte[600]);
@Override
public void run() {
try {
Connection con = createConnection();
final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = sendSession.createProducer(topic);
for (int i = 0; i < messageCount; i++) {
producer.send(sendSession.createTextMessage(payLoad));
}
LOG.info("About to commit: " + messageCount);
sendSession.commit();
LOG.info("committed: " + messageCount);
con.close();
} catch (Exception e) {
e.printStackTrace();
exceptions.add(e);
}
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
// concurrent commit and activate
for (int i = 0; i < numConsumers; i++) {
final CheckOrderClient client = new CheckOrderClient(i);
for (int j=0; j<100; j++) {
executorService.execute(client);
}
}
executorService.execute(producer);
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.MINUTES);
con.close();
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
// create offline subs 1
Connection con = createConnection("offCli1");