mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3507 - Large number of expiring messages causing QueueSize to groww. have cursor delegate to the destination for expiry processing. additional test. This will also sort out the intermittent failure of this test.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1173583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e23afb3391
commit
e6a9ae2f31
|
@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.IndirectMessageReference;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.QueueMessageReference;
|
||||
import org.apache.activemq.command.Message;
|
||||
|
@ -233,7 +234,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
discard(node);
|
||||
discardExpiredMessage(node);
|
||||
}
|
||||
//message expired
|
||||
return true;
|
||||
|
@ -279,7 +280,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
discard(node);
|
||||
discardExpiredMessage(node);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -410,7 +411,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
while (!tmpList.isEmpty()) {
|
||||
MessageReference node = tmpList.removeFirst();
|
||||
if (node.isExpired()) {
|
||||
discard(node);
|
||||
node.decrementReferenceCount();
|
||||
discardExpiredMessage(node);
|
||||
} else {
|
||||
memoryList.add(node);
|
||||
}
|
||||
|
@ -463,14 +465,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
return diskList;
|
||||
}
|
||||
|
||||
protected void discard(MessageReference message) {
|
||||
message.decrementReferenceCount();
|
||||
private void discardExpiredMessage(MessageReference reference) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Discarding message " + message);
|
||||
LOG.debug("Discarding expired message " + reference);
|
||||
}
|
||||
if (broker.isExpired(reference)) {
|
||||
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
|
||||
context.setBroker(broker);
|
||||
reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
|
||||
}
|
||||
ConnectionContext ctx = new ConnectionContext(new NonCachedMessageEvaluationContext());
|
||||
ctx.setBroker(broker);
|
||||
broker.getRoot().sendToDeadLetterQueue(ctx, message, null);
|
||||
}
|
||||
|
||||
protected ByteSequence getByteSequence(Message message) throws IOException {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
|
@ -72,14 +73,18 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
protected void createBrokerWithMemoryLimit() throws Exception {
|
||||
doCreateBroker(true);
|
||||
createBrokerWithMemoryLimit(800);
|
||||
}
|
||||
|
||||
protected void createBrokerWithMemoryLimit(int expireMessagesPeriod) throws Exception {
|
||||
doCreateBroker(true, expireMessagesPeriod);
|
||||
}
|
||||
|
||||
protected void createBroker() throws Exception {
|
||||
doCreateBroker(false);
|
||||
doCreateBroker(false, 800);
|
||||
}
|
||||
|
||||
private void doCreateBroker(boolean memoryLimit) throws Exception {
|
||||
private void doCreateBroker(boolean memoryLimit, int expireMessagesPeriod) throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setBrokerName("localhost");
|
||||
broker.setUseJmx(true);
|
||||
|
@ -89,7 +94,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setOptimizedDispatch(optimizedDispatch);
|
||||
defaultEntry.setExpireMessagesPeriod(800);
|
||||
defaultEntry.setExpireMessagesPeriod(expireMessagesPeriod);
|
||||
defaultEntry.setMaxExpirePageSize(800);
|
||||
|
||||
defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
|
||||
|
@ -109,6 +114,78 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
||||
}
|
||||
|
||||
public void testExpiredNonPersistentMessagesWithNoConsumer() throws Exception {
|
||||
|
||||
createBrokerWithMemoryLimit(2000);
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(destination);
|
||||
producer.setTimeToLive(1000);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
connection.start();
|
||||
final long sendCount = 2000;
|
||||
|
||||
final Thread producingThread = new Thread("Producing Thread") {
|
||||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
long tStamp = System.currentTimeMillis();
|
||||
while (i++ < sendCount) {
|
||||
producer.send(session.createTextMessage("test"));
|
||||
if (i%100 == 0) {
|
||||
LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
|
||||
tStamp = System.currentTimeMillis() ;
|
||||
}
|
||||
|
||||
if (135 == i) {
|
||||
// allow pending messages to expire, before usage limit kicks in to flush them
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
}
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
producingThread.start();
|
||||
|
||||
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
producingThread.join(TimeUnit.SECONDS.toMillis(3000));
|
||||
return !producingThread.isAlive();
|
||||
}
|
||||
}));
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
final DestinationViewMBean view = createView(destination);
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
try {
|
||||
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||
+ ", size= " + view.getQueueSize());
|
||||
return view.getDequeueCount() != 0
|
||||
&& view.getDequeueCount() == view.getExpiredCount()
|
||||
&& view.getDequeueCount() == view.getEnqueueCount()
|
||||
&& view.getQueueSize() == 0;
|
||||
} catch (Exception ignored) {
|
||||
LOG.info(ignored.toString());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, Wait.MAX_WAIT_MILLIS * 10);
|
||||
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||
+ ", size= " + view.getQueueSize());
|
||||
|
||||
assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
|
||||
assertEquals("0 queue", 0, view.getQueueSize());
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestExpiredMessagesWithNoConsumer() {
|
||||
addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()});
|
||||
|
|
Loading…
Reference in New Issue