fix duplicate detection of messages recovered when space limit is reached and fix cursor cache reenablement when free space becomes available, AMQ-2149

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@760075 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-03-30 18:04:20 +00:00
parent 69946d9805
commit c8e518b2dc
9 changed files with 94 additions and 63 deletions

View File

@ -91,7 +91,7 @@ public class ActiveMQMessageAudit {
} }
/** /**
* Checks if this message has beeb seen before * Checks if this message has been seen before
* *
* @param message * @param message
* @return true if the message is a duplicate * @return true if the message is a duplicate

View File

@ -272,10 +272,21 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
} }
public synchronized boolean isDuplicate(MessageId messageId) { public synchronized boolean isDuplicate(MessageId messageId) {
if (!enableAudit || audit==null) { boolean unique = recordUniqueId(messageId);
return false; rollback(messageId);
return !unique;
} }
return audit.isDuplicate(messageId);
/**
* records a message id and checks if it is a duplicate
* @param messageId
* @return true if id is unique, false otherwise.
*/
public synchronized boolean recordUniqueId(MessageId messageId) {
if (!enableAudit || audit==null) {
return true;
}
return !audit.isDuplicate(messageId);
} }
public synchronized void rollback(MessageId id) { public synchronized void rollback(MessageId id) {

View File

@ -37,7 +37,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected final Destination regionDestination; protected final Destination regionDestination;
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> (); private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private Iterator<Entry<MessageId, Message>> iterator = null; private Iterator<Entry<MessageId, Message>> iterator = null;
protected boolean cacheEnabled=false; private boolean cacheEnabled=false;
protected boolean batchResetNeeded = true; protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false; protected boolean storeHasMessages = false;
protected int size; protected int size;
@ -73,7 +73,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
boolean recovered = false; boolean recovered = false;
if (!isDuplicate(message.getMessageId())) { if (recordUniqueId(message.getMessageId())) {
if (!cached) { if (!cached) {
message.setRegionDestination(regionDestination); message.setRegionDestination(regionDestination);
if( message.getMemoryUsage()==null ) { if( message.getMemoryUsage()==null ) {
@ -157,6 +157,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} else { } else {
if (cacheEnabled) { if (cacheEnabled) {
cacheEnabled=false; cacheEnabled=false;
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size);
}
// sync with store on disabling the cache // sync with store on disabling the cache
if (lastCachedId != null) { if (lastCachedId != null) {
setBatch(lastCachedId); setBatch(lastCachedId);
@ -176,12 +179,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void remove() { public final synchronized void remove() {
size--; size--;
if (size==0 && isStarted() && useCache) {
cacheEnabled=true;
}
if (iterator!=null) { if (iterator!=null) {
iterator.remove(); iterator.remove();
} }
if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
}
cacheEnabled=true;
}
} }
public final synchronized void remove(MessageReference node) { public final synchronized void remove(MessageReference node) {

View File

@ -17,14 +17,11 @@
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.amq.AMQMessageStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -26,5 +26,10 @@ public interface MessageRecoveryListener {
boolean recoverMessage(Message message) throws Exception; boolean recoverMessage(Message message) throws Exception;
boolean recoverMessageReference(MessageId ref) throws Exception; boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace(); boolean hasSpace();
/**
* check if ref is a duplicate but do not record the reference
* @param ref
* @return true if ref is a duplicate
*/
boolean isDuplicate(MessageId ref); boolean isDuplicate(MessageId ref);
} }

View File

@ -381,6 +381,9 @@ public class AMQMessageStore extends AbstractMessageStore {
Entry<MessageId, ReferenceData> entry = iterator.next(); Entry<MessageId, ReferenceData> entry = iterator.next();
try { try {
if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) { if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
if (LOG.isDebugEnabled()) {
LOG.debug("adding message ref:" + entry.getKey());
}
size++; size++;
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -119,7 +119,7 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
do { do {
ReferenceRecord msg = messageContainer.getValue(entry); ReferenceRecord msg = messageContainer.getValue(entry);
if (msg != null ) { if (msg != null ) {
if ( recoverReference(listener, msg)) { if (recoverReference(listener, msg)) {
count++; count++;
lastBatchId = msg.getMessageId(); lastBatchId = msg.getMessageId();
} else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) { } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
@ -181,14 +181,6 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
} }
} }
public void addReferenceFileIdsInUse() {
for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
.getNext(entry)) {
ReferenceRecord msg = (ReferenceRecord)messageContainer.getValue(entry);
addInterest(msg);
}
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(ack.getLastMessageId()); removeMessage(ack.getLastMessageId());
} }
@ -274,6 +266,9 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
lock.lock(); lock.lock();
try { try {
batchEntry = messageContainer.getEntry(startAfter); batchEntry = messageContainer.getEntry(startAfter);
if (LOG.isDebugEnabled()) {
LOG.debug("setBatch: " + startAfter);
}
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -184,10 +184,6 @@ public abstract class Usage<T extends Usage> implements Service {
onLimitChange(); onLimitChange();
} }
/*
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
*/
public int getPercentUsage() { public int getPercentUsage() {
synchronized (usageMutex) { synchronized (usageMutex) {
return percentUsage; return percentUsage;
@ -243,8 +239,9 @@ public abstract class Usage<T extends Usage> implements Service {
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) { if (debug) {
LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage); LOG.info("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
} }
if (started.get()) { if (started.get()) {
// Switching from being full to not being full.. // Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) { if (oldPercentUsage >= 100 && newPercentUsage < 100) {

View File

@ -30,17 +30,20 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.broker.util.LoggingBrokerPlugin;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
@ -63,7 +66,7 @@ public class AMQ2149Test extends TestCase {
private final String SEQ_NUM_PROPERTY = "seqNum"; private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75000; final int MESSAGE_LENGTH_BYTES = 75 * 1024;
final int MAX_TO_SEND = 1500; final int MAX_TO_SEND = 1500;
final long SLEEP_BETWEEN_SEND_MS = 3; final long SLEEP_BETWEEN_SEND_MS = 3;
final int NUM_SENDERS_AND_RECEIVERS = 10; final int NUM_SENDERS_AND_RECEIVERS = 10;
@ -73,6 +76,8 @@ public class AMQ2149Test extends TestCase {
Vector<Throwable> exceptions = new Vector<Throwable>(); Vector<Throwable> exceptions = new Vector<Throwable>();
private File dataDirFile; private File dataDirFile;
final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
public void createBroker(Configurer configurer) throws Exception { public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
@ -112,7 +117,7 @@ public class AMQ2149Test extends TestCase {
private class Receiver implements MessageListener { private class Receiver implements MessageListener {
private final String queueName; private final javax.jms.Destination dest;
private final Connection connection; private final Connection connection;
@ -124,13 +129,17 @@ public class AMQ2149Test extends TestCase {
private String lastId = null; private String lastId = null;
public Receiver(String queueName) throws JMSException { public Receiver(javax.jms.Destination dest) throws JMSException {
this.queueName = queueName; this.dest = dest;
connection = new ActiveMQConnectionFactory(BROKER_URL) connection = new ActiveMQConnectionFactory(BROKER_URL)
.createConnection(); .createConnection();
connection.setClientID(dest.toString());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = session.createConsumer(new ActiveMQQueue( if (ActiveMQDestination.transform(dest).isTopic()) {
queueName)); messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
} else {
messageConsumer = session.createConsumer(dest);
}
messageConsumer.setMessageListener(this); messageConsumer.setMessageListener(this);
connection.start(); connection.start();
} }
@ -147,22 +156,22 @@ public class AMQ2149Test extends TestCase {
try { try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % 500) == 0) { if ((seqNum % 500) == 0) {
LOG.info(queueName + " received " + seqNum); LOG.info(dest + " received " + seqNum);
} }
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum) {
LOG.warn(queueName + " received " + seqNum LOG.warn(dest + " received " + seqNum
+ " in msg: " + message.getJMSMessageID() + " in msg: " + message.getJMSMessageID()
+ " expected " + " expected "
+ nextExpectedSeqNum + nextExpectedSeqNum
+ ", lastId: " + lastId + ", lastId: " + lastId
+ ", message:" + message); + ", message:" + message);
fail(queueName + " received " + seqNum + " expected " fail(dest + " received " + seqNum + " expected "
+ nextExpectedSeqNum); + nextExpectedSeqNum);
} }
++nextExpectedSeqNum; ++nextExpectedSeqNum;
lastId = message.getJMSMessageID(); lastId = message.getJMSMessageID();
} catch (Throwable e) { } catch (Throwable e) {
LOG.error(queueName + " onMessage error", e); LOG.error(dest + " onMessage error", e);
exceptions.add(e); exceptions.add(e);
} }
} }
@ -171,7 +180,7 @@ public class AMQ2149Test extends TestCase {
private class Sender implements Runnable { private class Sender implements Runnable {
private final String queueName; private final javax.jms.Destination dest;
private final Connection connection; private final Connection connection;
@ -181,13 +190,12 @@ public class AMQ2149Test extends TestCase {
private volatile long nextSequenceNumber = 0; private volatile long nextSequenceNumber = 0;
public Sender(String queueName) throws JMSException { public Sender(javax.jms.Destination dest) throws JMSException {
this.queueName = queueName; this.dest = dest;
connection = new ActiveMQConnectionFactory(BROKER_URL) connection = new ActiveMQConnectionFactory(BROKER_URL)
.createConnection(); .createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = session.createProducer(new ActiveMQQueue( messageProducer = session.createProducer(dest);
queueName));
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start(); connection.start();
} }
@ -203,14 +211,14 @@ public class AMQ2149Test extends TestCase {
++nextSequenceNumber; ++nextSequenceNumber;
messageProducer.send(message); messageProducer.send(message);
} catch (Exception e) { } catch (Exception e) {
LOG.error(queueName + " send error", e); LOG.error(dest + " send error", e);
exceptions.add(e); exceptions.add(e);
} }
if (SLEEP_BETWEEN_SEND_MS > 0) { if (SLEEP_BETWEEN_SEND_MS > 0) {
try { try {
Thread.sleep(SLEEP_BETWEEN_SEND_MS); Thread.sleep(SLEEP_BETWEEN_SEND_MS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn(queueName + " sleep interrupted", e); LOG.warn(dest + " sleep interrupted", e);
} }
} }
} }
@ -240,7 +248,7 @@ public class AMQ2149Test extends TestCase {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
SystemUsage usage = new SystemUsage(); SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage(); MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS); memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 10 * NUM_SENDERS_AND_RECEIVERS);
usage.setMemoryUsage(memoryUsage); usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage); broker.setSystemUsage(usage);
@ -252,7 +260,8 @@ public class AMQ2149Test extends TestCase {
verifyStats(false); verifyStats(false);
} }
public void testOrderWithRestartAndVMIndex() throws Exception { // no need to run this unless there are some issues with the others
public void noProblem_testOrderWithRestartAndVMIndex() throws Exception {
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = AMQPersistenceAdapterFactory persistenceFactory =
@ -288,7 +297,10 @@ public class AMQ2149Test extends TestCase {
}); });
final Timer timer = new Timer(); final Timer timer = new Timer();
schedualRestartTask(timer, null); schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
}
});
try { try {
verifyOrderedMessageReceipt(); verifyOrderedMessageReceipt();
@ -300,29 +312,27 @@ public class AMQ2149Test extends TestCase {
} }
public void testOrderWithRestartAndNoCache() throws Exception { public void x_testTopicOrderWithRestart() throws Exception {
plugins[0].setLogAll(true);
plugins[0].setLogInternalEvents(false);
PolicyEntry noCache = new PolicyEntry();
noCache.setUseCache(false);
final PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(noCache);
createBroker(new Configurer() { createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.setDestinationPolicy(policyMap);
broker.deleteAllMessages(); broker.deleteAllMessages();
broker.setPlugins(plugins);
} }
}); });
final Timer timer = new Timer(); final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() { schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception { public void configure(BrokerService broker) throws Exception {
broker.setDestinationPolicy(policyMap); broker.setPlugins(plugins);
} }
}); });
try { try {
verifyOrderedMessageReceipt(); verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
} finally { } finally {
timer.cancel(); timer.cancel();
} }
@ -339,6 +349,7 @@ public class AMQ2149Test extends TestCase {
AMQPersistenceAdapterFactory persistenceFactory = AMQPersistenceAdapterFactory persistenceFactory =
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true); persistenceFactory.setForceRecoverReferenceStore(true);
broker.setPlugins(plugins);
broker.deleteAllMessages(); broker.deleteAllMessages();
} }
}); });
@ -349,6 +360,7 @@ public class AMQ2149Test extends TestCase {
AMQPersistenceAdapterFactory persistenceFactory = AMQPersistenceAdapterFactory persistenceFactory =
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true); persistenceFactory.setForceRecoverReferenceStore(true);
broker.setPlugins(plugins);
} }
}); });
@ -408,19 +420,24 @@ public class AMQ2149Test extends TestCase {
} }
private void verifyOrderedMessageReceipt() throws Exception { private void verifyOrderedMessageReceipt() throws Exception {
verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE);
}
private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
Vector<Thread> threads = new Vector<Thread>(); Vector<Thread> threads = new Vector<Thread>();
Vector<Receiver> receivers = new Vector<Receiver>(); Vector<Receiver> receivers = new Vector<Receiver>();
for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) { for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
final String queueName = "test.queue." + i; final javax.jms.Destination destination =
receivers.add(new Receiver(queueName)); ActiveMQDestination.createDestination("test.dest." + i, destinationType);
Thread thread = new Thread(new Sender(queueName)); receivers.add(new Receiver(destination));
Thread thread = new Thread(new Sender(destination));
thread.start(); thread.start();
threads.add(thread); threads.add(thread);
} }
final long expiry = System.currentTimeMillis() + 1000 * 60 * 20; final long expiry = System.currentTimeMillis() + 1000 * 60 * 30;
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement(); Thread sendThread = threads.firstElement();
sendThread.join(1000*10); sendThread.join(1000*10);