diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index f9cd50fe06..e4fda37d6a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -2159,13 +2159,13 @@ public class BrokerService implements Service { public void run() { try { checkStoreUsageLimits(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Failed to check persistent disk usage limits", e); } try { checkTmpStoreUsageLimits(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Failed to check temporary store usage limits", e); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 183ecd35a7..5e8a4749c5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -39,6 +39,7 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -144,10 +145,21 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index asyncWakeup(); } }; - private final Runnable expireMessagesTask = new Runnable() { + private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false); + private final Runnable expireMessagesWork = new Runnable() { @Override public void run() { expireMessages(); + expiryTaskInProgress.set(false); + } + }; + + private final Runnable expireMessagesTask = new Runnable() { + @Override + public void run() { + if (expiryTaskInProgress.compareAndSet(false, true)) { + taskFactory.execute(expireMessagesWork); + } } }; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 32efab490d..da6b4dcc50 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.InvalidClientIDException; @@ -112,10 +113,26 @@ public class RegionBroker extends EmptyBroker { private boolean allowTempAutoCreationOnSend; private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); + private final TaskRunnerFactory taskRunnerFactory; + private final AtomicBoolean purgeInactiveDestinationsTaskInProgress = new AtomicBoolean(false); private final Runnable purgeInactiveDestinationsTask = new Runnable() { @Override public void run() { - purgeInactiveDestinations(); + if (purgeInactiveDestinationsTaskInProgress.compareAndSet(false, true)) { + taskRunnerFactory.execute(purgeInactiveDestinationsWork); + } + } + }; + private final Runnable purgeInactiveDestinationsWork = new Runnable() { + @Override + public void run() { + try { + purgeInactiveDestinations(); + } catch (Throwable ignored) { + LOG.error("Unexpected exception on purgeInactiveDestinations {}", this, ignored); + } finally { + purgeInactiveDestinationsTaskInProgress.set(false); + } } }; @@ -134,6 +151,7 @@ public class RegionBroker extends EmptyBroker { this.destinationInterceptor = destinationInterceptor; tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); + this.taskRunnerFactory = taskRunnerFactory; } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 3ed4aafd26..a0f2d067d5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.advisory.AdvisorySupport; @@ -75,6 +76,7 @@ public class Topic extends BaseDestination implements Task { private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private final ConcurrentMap durableSubscribers = new ConcurrentHashMap(); private final TaskRunner taskRunner; + private final TaskRunnerFactory taskRunnerFactor; private final LinkedList messagesWaitingForSpace = new LinkedList(); private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { @Override @@ -92,6 +94,7 @@ public class Topic extends BaseDestination implements Task { this.topicStore = store; subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); + this.taskRunnerFactor = taskFactory; } @Override @@ -787,11 +790,21 @@ public class Topic extends BaseDestination implements Task { } } - private final Runnable expireMessagesTask = new Runnable() { + private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false); + private final Runnable expireMessagesWork = new Runnable() { @Override public void run() { List browsedMessages = new InsertionCountList(); doBrowse(browsedMessages, getMaxExpirePageSize()); + expiryTaskInProgress.set(false); + } + }; + private final Runnable expireMessagesTask = new Runnable() { + @Override + public void run() { + if (expiryTaskInProgress.compareAndSet(false, true)) { + taskRunnerFactor.execute(expireMessagesWork); + } } }; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 3a7924e3a6..d9f334960a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -21,8 +21,12 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -39,6 +43,7 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.activemq.TestSupport.getDestination; @@ -150,6 +155,63 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } + + public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception { + final ActiveMQDestination destination = new ActiveMQQueue("test"); + broker = new BrokerService(); + broker.setBrokerName("localhost"); + broker.setDestinations(new ActiveMQDestination[]{destination}); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setExpireMessagesPeriod(1000); + defaultPolicy.setMaxExpirePageSize(2000); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + broker.setDestinationPolicy(policyMap); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + broker.addConnector("tcp://localhost:0"); + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) { + try { + LOG.info("Sleeping before delegation on sendToDeadLetterQueue"); + TimeUnit.SECONDS.sleep(1); + } catch (Exception ignored) {} + return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); + } + }}); + broker.start(); + broker.waitUntilStarted(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "vm://localhost"); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(0); + factory.setPrefetchPolicy(prefetchPolicy); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(1000); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i=0;i<10; i++) { + producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 2000); + } + + consumer = session.createConsumer(new ActiveMQQueue("another-test")); + + for (int i=0; i<10; i++) { + long timeStamp = System.currentTimeMillis(); + consumer.receive(1000); + long duration = System.currentTimeMillis() - timeStamp; + LOG.info("Duration: " + i + " : " + duration); + assertTrue("Delay about 500: " + i + ", actual: " + duration, duration < 1500); + } + } + + private void produce(int num, ActiveMQDestination destination) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "failover://"+brokerUri); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java index 87be689720..7f6a8b5d19 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,6 +36,8 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.junit.After; import org.junit.Assert; @@ -256,6 +259,30 @@ public class TopicSubscriptionZeroPrefetchTest { Assert.assertNotNull("should have received a message the published message", consumedMessage); } + @Test(timeout = 420000) + public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception { + + ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0"); + + for (int i=0; i<500; i++) { + consumer = session.createDurableSubscriber(consumerDestination, "mysub-" + i); + consumer.close(); + } + + for (int i=0;i<1000; i++) { + producer.send(session.createTextMessage("RTR"), DeliveryMode.PERSISTENT, 0, 5000); + } + + consumer = session.createDurableSubscriber(consumerDestination, "mysub3"); + for (int i=0; i<10; i++) { + long timeStamp = System.currentTimeMillis(); + consumer.receive(1000); + long duration = System.currentTimeMillis() - timeStamp; + LOG.info("Duration: " + i + " : " + duration); + assertTrue("Delay about 500: " + i, duration < 1500); + } + } + @After public void tearDown() throws Exception { consumer.close(); @@ -272,6 +299,13 @@ public class TopicSubscriptionZeroPrefetchTest { broker.setUseJmx(false); broker.setDeleteAllMessagesOnStartup(true); broker.addConnector("vm://localhost"); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(5000); + policyEntry.setMaxExpirePageSize(2000); + policyEntry.setUseCache(false); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); broker.start(); broker.waitUntilStarted(); return broker;