mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6188 - reset BaseDestination.lastActiveTime each time a message is delivered to the broker.
This commit is contained in:
parent
7335f699f2
commit
552c0f0f7a
|
@ -523,6 +523,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
|
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
|
||||||
|
this.lastActiveTime = 0L;
|
||||||
if (advisoryForDelivery) {
|
if (advisoryForDelivery) {
|
||||||
broker.messageDelivered(context, messageReference);
|
broker.messageDelivered(context, messageReference);
|
||||||
}
|
}
|
||||||
|
@ -777,7 +778,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
@Override
|
@Override
|
||||||
public boolean canGC() {
|
public boolean canGC() {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (isGcIfInactive()&& this.lastActiveTime != 0l) {
|
if (isGcIfInactive() && this.lastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
|
||||||
if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) {
|
if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) {
|
||||||
result = true;
|
result = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,14 @@
|
||||||
package org.apache.activemq.broker.region;
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageListener;
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
@ -37,8 +39,13 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class DestinationGCTest {
|
public class DestinationGCTest {
|
||||||
|
|
||||||
|
protected static final Logger logger = LoggerFactory.getLogger(DestinationGCTest.class);
|
||||||
|
|
||||||
private final ActiveMQQueue queue = new ActiveMQQueue("TEST");
|
private final ActiveMQQueue queue = new ActiveMQQueue("TEST");
|
||||||
private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER");
|
private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER");
|
||||||
|
|
||||||
|
@ -137,4 +144,34 @@ public class DestinationGCTest {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDestinationGcAnonymousProducer() throws Exception {
|
||||||
|
|
||||||
|
final ActiveMQQueue q = new ActiveMQQueue("Q.TEST.ANONYMOUS.PRODUCER");
|
||||||
|
|
||||||
|
brokerService.getAdminView().addQueue(q.getPhysicalName());
|
||||||
|
assertEquals(2, brokerService.getAdminView().getQueues().length);
|
||||||
|
|
||||||
|
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
|
||||||
|
final Connection connection = factory.createConnection();
|
||||||
|
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
// wait for the queue to be marked for GC
|
||||||
|
logger.info("Waiting for '{}' to be marked for GC...", q);
|
||||||
|
Wait.waitFor(new Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getDestination(q).canGC();
|
||||||
|
}
|
||||||
|
}, Wait.MAX_WAIT_MILLIS, 500L);
|
||||||
|
|
||||||
|
// create anonymous producer and send a message
|
||||||
|
logger.info("Sending PERSISTENT message to QUEUE '{}'", q.getPhysicalName());
|
||||||
|
final MessageProducer producer = session.createProducer(null);
|
||||||
|
producer.send(q, session.createTextMessage());
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
assertFalse(brokerService.getDestination(q).canGC());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue