https://issues.apache.org/jira/browse/AMQ-3167 - possible skipped Queue messages in memory limited configuration with fast consumers

resolve off by one error in messageOrderIndex setBatch

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1064660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-01-28 13:25:22 +00:00
parent aadd682d09
commit acc3d4f7cb
5 changed files with 84 additions and 33 deletions

View File

@ -168,27 +168,30 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (!cacheEnabled && size==0 && isStarted() && useCache && hasSpace()) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on empty add");
}
cacheEnabled=true;
}
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
lastCachedId = node.getMessageId();
} else {
if (cacheEnabled) {
cacheEnabled=false;
if (hasSpace()) {
if (!cacheEnabled && size==0 && isStarted() && useCache) {
if (LOG.isTraceEnabled()) {
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
+ ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
+ " current node seqId: " + node.getMessageId().getBrokerSequenceId());
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+ " enabling cache for empty store " + node.getMessageId());
}
// sync with store on disabling the cache
if (lastCachedId != null) {
setBatch(lastCachedId);
cacheEnabled=true;
}
if (cacheEnabled) {
recoverMessage(node.getMessage(),true);
lastCachedId = node.getMessageId();
}
} else if (cacheEnabled) {
cacheEnabled=false;
// sync with store on disabling the cache
if (lastCachedId != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
+ ", lastCachedId: " + lastCachedId
+ " current node Id: " + node.getMessageId());
}
setBatch(lastCachedId);
lastCachedId = null;
}
}
this.storeHasMessages = true;

View File

@ -487,15 +487,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext()
&& listener.hasSpace();) {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
listener.hasSpace() && iterator.hasNext(); ) {
entry = iterator.next();
Message msg = loadMessage(entry.getValue().location);
//System.err.println("RECOVER " + msg.getMessageId().getProducerSequenceId());
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || listener.hasSpace() == false) {
if (counter >= maxReturned) {
break;
}
}
@ -531,7 +529,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// operations... but for now we must
// externally synchronize...
indexLock.readLock().lock();
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@ -543,7 +541,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
});
}finally {
indexLock.readLock().unlock();
indexLock.writeLock().unlock();
}
} finally {

View File

@ -2136,18 +2136,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (sequence != null) {
Long nextPosition = new Long(sequence.longValue() + 1);
if (defaultPriorityIndex.containsKey(tx, sequence)) {
lastDefaultKey = nextPosition;
lastDefaultKey = sequence;
cursor.defaultCursorPosition = nextPosition.longValue();
} else if (highPriorityIndex != null) {
if (highPriorityIndex.containsKey(tx, sequence)) {
lastHighKey = nextPosition;
lastHighKey = sequence;
cursor.highPriorityCursorPosition = nextPosition.longValue();
} else if (lowPriorityIndex.containsKey(tx, sequence)) {
lastLowKey = nextPosition;
lastLowKey = sequence;
cursor.lowPriorityCursorPosition = nextPosition.longValue();
}
} else {
lastDefaultKey = nextPosition;
lastDefaultKey = sequence;
cursor.defaultCursorPosition = nextPosition.longValue();
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.bugs;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -43,8 +45,14 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AMQ2413Test extends CombinationTestSupport implements MessageListener {
private static final Log LOG = LogFactory.getLog(AMQ2413Test.class);
BrokerService broker;
private ActiveMQConnectionFactory factory;
@ -53,6 +61,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
private static final int RECEIVER_THINK_TIME = 1;
private static final int CONSUMER_COUNT = 1;
private static final int PRODUCER_COUNT = 50;
private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT;
public int deliveryMode = DeliveryMode.NON_PERSISTENT;
public int ackMode = Session.DUPS_OK_ACKNOWLEDGE;
@ -75,6 +84,9 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
broker = new BrokerService();
broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
broker.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
kahaDb.setConcurrentStoreAndDispatchQueues(false);
broker.addConnector("tcp://0.0.0.0:2401");
PolicyMap policies = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
@ -150,8 +162,9 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
public void onMessage(Message message) {
receivedMessages.release();
if (count.incrementAndGet() % 100 == 0) {
System.out.println("Received message " + count);
LOG.info("Received message " + count);
}
track(message);
if (RECEIVER_THINK_TIME > 0) {
try {
Thread.currentThread().sleep(RECEIVER_THINK_TIME);
@ -162,6 +175,26 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
}
HashMap<ProducerId, boolean[]> tracker = new HashMap<ProducerId, boolean[]>();
private synchronized void track(Message message) {
try {
MessageId id = new MessageId(message.getJMSMessageID());
ProducerId pid = id.getProducerId();
int seq = (int)id.getProducerSequenceId();
boolean[] ids = tracker.get(pid);
if (ids == null) {
ids = new boolean[TO_SEND + 1];
ids[seq] = true;
tracker.put(pid, ids);
} else {
assertTrue("not already received: " + id, !ids[seq]);
ids[seq] = true;
}
} catch (Exception e) {
LOG.error(e);
}
}
/**
* @throws InterruptedException
* @throws TimeoutException
@ -172,6 +205,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
while (count.get() < SEND_COUNT) {
if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) {
if (count.get() == SEND_COUNT) break;
verifyTracking();
throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds");
}
}
@ -180,6 +214,19 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
}
}
private void verifyTracking() {
Vector<MessageId> missing = new Vector<MessageId>();
for (ProducerId pid : tracker.keySet()) {
boolean[] ids = tracker.get(pid);
for (int i=1; i<TO_SEND + 1; i++) {
if (!ids[i]) {
missing.add(new MessageId(pid, i));
}
}
}
assertTrue("No missing messages: " + missing, missing.isEmpty());
}
private interface Service {
public void start() throws Exception;
@ -210,12 +257,13 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
public void run() {
int count = SEND_COUNT / PRODUCER_COUNT;
for (int i = 1; i <= count; i++) {
int i = 1;
for (; i <= TO_SEND; i++) {
try {
if (+i % 100 == 0) {
System.out.println(thread.currentThread().getName() + " Sending message " + i);
LOG.info(thread.currentThread().getName() + " Sending message " + i);
}
message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
@ -226,6 +274,7 @@ public class AMQ2413Test extends CombinationTestSupport implements MessageListen
break;
}
}
LOG.info(thread.currentThread().getName() + " Sent: " + (i-1));
}
public void close() {

View File

@ -67,6 +67,7 @@ public class AMQ3145Test {
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
connection = factory.createConnection();
connection.start();