resolve https://issues.apache.org/activemq/browse/AMQ-2487 - usage management of storecursor iterator was broken in that a browse would decrement the usage. memory management across move and retry operations is now correct. modified some tests to validate memory usage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@882096 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-19 10:33:41 +00:00
parent d7b0408803
commit 9b260dc37e
7 changed files with 37 additions and 15 deletions

View File

@ -723,11 +723,7 @@ public class RegionBroker extends EmptyBroker {
.getDestination());
if (context.getBroker()==null) {
context.setBroker(getRoot());
}
// Clear out the memory usage for the old queue.
// We'll reset it to the DLQ below:
message.setMemoryUsage(null);
}
BrokerSupport.resendNoCopy(context,message,
deadLetterDestination);
}

View File

@ -43,6 +43,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;
private boolean started=false;
protected MessageReference last = null;
public synchronized void start() throws Exception {

View File

@ -113,6 +113,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private synchronized void clearIterator(boolean ensureIterator) {
boolean haveIterator = this.iterator != null;
this.iterator=null;
last = null;
if(haveIterator&&ensureIterator) {
ensureIterator();
}
@ -142,11 +143,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final synchronized MessageReference next() {
Message result = null;
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
result = this.iterator.next().getValue();
result.decrementReferenceCount();
}
last = result;
return result;
}
@ -182,6 +183,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (iterator!=null) {
iterator.remove();
}
if (last != null) {
last.decrementReferenceCount();
}
if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");

View File

@ -58,8 +58,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean iterating;
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null;
/**
* @param name
* @param store

View File

@ -35,8 +35,6 @@ import org.apache.activemq.broker.region.QueueMessageReference;
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
private Iterator<MessageReference> iter;
private MessageReference last;
public VMPendingMessageCursor(){
this.useCache=false;
}

View File

@ -52,6 +52,7 @@ public final class BrokerSupport {
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.setMemoryUsage(null);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);

View File

@ -40,6 +40,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
@ -120,6 +122,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
messageIDs[i] = messageID;
}
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
echo("About to move " + messageCount + " messages");
@ -138,11 +141,15 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
echo("Now browsing the second queue");
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
long newQueuesize = queue.getQueueSize();
long newQueuesize = queueNew.getQueueSize();
echo("Second queue size: " + newQueuesize);
assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
// check memory usage migration
assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testRetryMessages() throws Exception {
@ -164,7 +171,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
long initialQueueSize = queue.getQueueSize();
echo("current queue size: " + initialQueueSize);
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
// lets create a duff consumer which keeps rolling back...
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -203,6 +210,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
messageIDs[i] = messageID;
}
int dlqMemUsage = dlq.getMemoryPercentUsage();
assertTrue("dlq has some memory usage", dlqMemUsage > 0);
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
echo("About to retry " + messageCount + " messages");
@ -223,6 +234,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
assertEquals("queue size", initialQueueSize, queueSize);
assertEquals("browse queue size", initialQueueSize, actualCount);
assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage());
}
public void testMoveMessagesBySelector() throws Exception {
@ -246,6 +261,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
public void testCopyMessagesBySelector() throws Exception {
@ -272,6 +288,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
@ -528,7 +545,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
answer.setPersistent(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.setUseJmx(true);
//answer.setEnableStatistics(true);
// apply memory limit so that %usage is visible
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setMemoryLimit(1024*1024*4);
policyMap.setDefaultEntry(defaultEntry);
answer.setDestinationPolicy(policyMap);
answer.addConnector(bindAddress);
return answer;
}