diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index e9f218038b..ff16dfc762 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1169,7 +1169,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List toExpire) throws Exception { for (Iterator i = refs.iterator(); i.hasNext() && l.size() < max;) { QueueMessageReference ref = (QueueMessageReference) i.next(); - if (ref.isExpired()) { + if (ref.isExpired() && (ref.getLockOwner() == null)) { toExpire.add(ref); } else if (l.contains(ref.getMessage()) == false) { l.add(ref.getMessage()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 7c7027fedf..358f946a58 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -52,14 +52,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner final Destination q = (Destination) n.getRegionDestination(); final QueueMessageReference node = (QueueMessageReference)n; final Queue queue = (Queue)q; - - if (n.isExpired()) { - // sync with message expiry processing - if (!broker.isExpired(n)) { - LOG.debug("ignoring ack {}, for already expired message: {}", ack, n); - return; - } - } queue.removeMessage(context, this, node, ack); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java index 956fa40fed..391253ee70 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java @@ -30,6 +30,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +155,7 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { received.acknowledge(); }; - assertEquals("got messages", messageCount + 1, messages.size()); + assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size()); Vector dlqMessages = new Vector(); while ((received = dlqConsumer.receive(1000)) != null) { @@ -159,6 +163,21 @@ public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { }; assertEquals("got dlq messages", data.length - 1, dlqMessages.size()); + + final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination)); + + // wait for all to inflight to expire + assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return view.getInflight().getCount() == 0; + } + })); + assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount()); + + LOG.info("Stats: received: " + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount()); + } /** diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java new file mode 100644 index 0000000000..4ba6526890 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AMQ5274Test { + static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class); + String activemqURL; + BrokerService brokerService; + ActiveMQQueue dest = new ActiveMQQueue("TestQ"); + + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setExpireMessagesPeriod(1000); + policyMap.setDefaultEntry(defaultPolicy); + brokerService.setDestinationPolicy(policyMap); + activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + brokerService.start(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void test() throws Exception { + LOG.info("Starting Test"); + assertTrue(brokerService.isStarted()); + + produce(); + consumeAndRollback(); + + // check reported queue size using JMX + long queueSize = getQueueSize(); + assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize); + } + + private void consumeAndRollback() throws JMSException, InterruptedException { + ActiveMQConnection connection = createConnection(); + RedeliveryPolicy noRedelivery = new RedeliveryPolicy(); + noRedelivery.setMaximumRedeliveries(0); + connection.setRedeliveryPolicy(noRedelivery); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(dest); + Message m; + while ( (m = consumer.receive(4000)) != null) { + LOG.info("Got:" + m); + TimeUnit.SECONDS.sleep(1); + session.rollback(); + } + connection.close(); + } + + private void produce() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(dest); + producer.setTimeToLive(10000); + for (int i=0;i<20;i++) { + producer.send(session.createTextMessage("i="+i)); + } + connection.close(); + } + + private ActiveMQConnection createConnection() throws JMSException { + return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection(); + } + + + public long getQueueSize() throws Exception { + long queueSize = 0; + try { + QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false); + queueSize = queueViewMBean.getQueueSize(); + LOG.info("QueueSize for destination {} is {}", dest, queueSize); + } catch (Exception ex) { + LOG.error("Error retrieving QueueSize from JMX ", ex); + throw ex; + } + return queueSize; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 4c97972f2a..02055991fc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -190,7 +190,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { // memory check assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage()); - assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage()); + assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getUsage()); // verify DLQ MessageConsumer dlqConsumer = createDlqConsumer(connection); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index ebdb5bcf2a..e2ad7f602a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -257,7 +257,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { // first ack delivered after expiry public void testExpiredMessagesWithVerySlowConsumer() throws Exception { createBroker(); - final long queuePrefetch = 600; + final long queuePrefetch = 5; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); connection = factory.createConnection(); @@ -266,7 +266,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { final int ttl = 4000; producer.setTimeToLive(ttl); - final long sendCount = 1500; + final long sendCount = 10; final CountDownLatch receivedOneCondition = new CountDownLatch(1); final CountDownLatch waitCondition = new CountDownLatch(1); @@ -328,10 +328,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { return queuePrefetch == view.getDispatchCount(); } })); - assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() { + assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return sendCount == view.getExpiredCount(); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); + + return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount(); } })); @@ -448,10 +452,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { return queuePrefetch == view.getDispatchCount(); } })); - assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() { + + assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return sendCount == view.getExpiredCount(); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); + + return view.getExpiredCount() > 0 && (view.getEnqueueCount() - view.getInFlightCount()) == view.getExpiredCount(); } }));