mirror of https://github.com/apache/activemq.git
AMQ-7001 - ensure cursor pending cached id list is pruned of futures that end in an exception, fix and test
This commit is contained in:
parent
76490a2c7f
commit
83514ef799
|
@ -20,6 +20,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -350,8 +351,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
final Object futureOrLong = candidate.getFutureOrSequenceLong();
|
||||
if (futureOrLong instanceof Future) {
|
||||
Future future = (Future) futureOrLong;
|
||||
if (future.isCancelled()) {
|
||||
it.remove();
|
||||
if (future.isDone()) {
|
||||
if (future.isCancelled()) {
|
||||
it.remove();
|
||||
} else {
|
||||
// check for exception, we may be seeing old state
|
||||
try {
|
||||
future.get(0, TimeUnit.SECONDS);
|
||||
// stale; if we get a result next prune will see Long
|
||||
} catch (ExecutionException expected) {
|
||||
it.remove();
|
||||
} catch (Exception unexpected) {
|
||||
LOG.debug("{} unexpected exception verifying exception state of future", this, unexpected);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// we don't want to wait for work to complete
|
||||
break;
|
||||
|
|
|
@ -219,6 +219,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
this.nonPersistent = nonPersistent;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the persistent Cursor
|
||||
*/
|
||||
public PendingMessageCursor getPersistent() { return this.persistent; }
|
||||
|
||||
@Override
|
||||
public void setMaxBatchSize(int maxBatchSize) {
|
||||
persistent.setMaxBatchSize(maxBatchSize);
|
||||
|
|
|
@ -20,9 +20,14 @@ package org.apache.activemq.store.kahadb;
|
|||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
|
||||
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.store.TransactionIdTransformer;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -33,10 +38,14 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.io.File;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
||||
|
@ -120,4 +129,58 @@ public class ErrorOnFutureSendTest {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSuccessiveFailedSendsDoesNotConsumeMemInError() throws Exception {
|
||||
|
||||
adapter.setTransactionIdTransformer(new TransactionIdTransformer() {
|
||||
@Override
|
||||
public TransactionId transform(TransactionId txid) {
|
||||
throw new RuntimeException("Bla");
|
||||
}
|
||||
});
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
|
||||
connectionFactory.setWatchTopicAdvisories(false);
|
||||
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createMessage();
|
||||
|
||||
final AtomicInteger received = new AtomicInteger();
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
received.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
final int numIterations = 10;
|
||||
for (int i=0; i<numIterations; i++) {
|
||||
try {
|
||||
producer.send(message);
|
||||
fail("Expect exception");
|
||||
} catch (JMSException expected) {}
|
||||
}
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return received.get() == numIterations;
|
||||
}
|
||||
});
|
||||
consumer.close();
|
||||
connection.close();
|
||||
|
||||
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
||||
Queue queue = (Queue) regionBroker.getQueueRegion().getDestinationMap().get(destination);
|
||||
StoreQueueCursor storeQueueCursor = (StoreQueueCursor) queue.getMessages();
|
||||
PendingMessageCursor queueStorePrefetch = storeQueueCursor.getPersistent();
|
||||
LOG.info("QueueStorePrefetch {}", queueStorePrefetch);
|
||||
String toString = queueStorePrefetch.toString();
|
||||
assertTrue("contains pendingCachedIds.size:1", toString.contains("pendingCachedIds.size:1"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue