diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index fc0603aaaa..4de918f4a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -644,6 +644,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (ack != null) { deliveredMessages.clear(); ackCounter = 0; + } else { + ack = pendingAck; + pendingAck = null; } } } else if (pendingAck != null && pendingAck.isStandardAck()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 309db4ace6..6f485e9b9d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -69,7 +69,7 @@ public class IndirectMessageReference implements QueueMessageReference { } public String toString() { - return "Message " + message.getMessageId() + " dropped=" + dropped + " locked=" + (lockOwner != null); + return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); } public void incrementRedeliveryCounter() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 2502317896..515b8e9f0f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -303,9 +303,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription { int index = 0; for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { final MessageReference node = iter.next(); - if( node.isExpired() ) { - node.getRegionDestination().messageExpired(context, this, node); + if (hasNotAlreadyExpired(node)) { + if (node.isExpired()) { + node.getRegionDestination().messageExpired(context, this, node); + dispatched.remove(node); + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + } + } else { + // already expired dispatched.remove(node); + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } if (ack.getLastMessageId().equals(node.getMessageId())) { prefetchExtension = Math.max(prefetchExtension, index + 1); @@ -411,6 +418,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } + private boolean hasNotAlreadyExpired(MessageReference node) { + boolean hasNotExpired = true; + try { + hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION) == null; + } catch (IOException e) { + LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION + " for " + node, e); + } + return hasNotExpired; + } + /** * Checks an ack versus the contents of the dispatched list. * @@ -545,6 +562,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription { List rc = new ArrayList(); synchronized(pendingLock) { super.remove(context, destination); + // Here is a potential problem concerning Inflight stat: + // Messages not already committed or rolled back may not be removed from dispatched list at the moment + // Except if each commit or rollback callback action comes before remove of subscriber. + rc.addAll(pending.remove(context, destination)); + // Synchronized to DispatchLock synchronized(dispatchLock) { for (MessageReference r : dispatched) { @@ -552,12 +574,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { rc.add((QueueMessageReference)r); } } - } - // TODO Dispatched messages should be decremented from Inflight stat - // Here is a potential problem concerning Inflight stat: - // Messages not already committed or rolled back may not be removed from dispatched list at the moment - // Except if each commit or rollback callback action comes before remove of subscriber. - rc.addAll(pending.remove(context, destination)); + destination.getDestinationStatistics().getDispatched().subtract(dispatched.size()); + destination.getDestinationStatistics().getInflight().subtract(dispatched.size()); + dispatched.clear(); + } } return rc; } @@ -661,12 +681,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node.getRegionDestination() != null) { if (node != QueueMessageReference.NULL_MESSAGE) { node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); - node.getRegionDestination().getDestinationStatistics().getInflight().increment(); + node.getRegionDestination().getDestinationStatistics().getInflight().increment(); + if (LOG.isTraceEnabled()) { + LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId() + + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount() + + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount()); + } } } - if (LOG.isTraceEnabled()) { - LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId()); - } + if (info.isDispatchAsync()) { try { dispatchPending(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 82a558149a..78d538cb87 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -205,7 +205,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Message could have expired while it was being // loaded.. if (broker.isExpired(message)) { - messageExpired(createConnectionContext(), null, message, false); + messageExpired(createConnectionContext(), message); return true; } if (hasSpace()) { @@ -343,6 +343,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { // while removing up a subscription. dispatchLock.lock(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + + ", dispatched: " + getDestinationStatistics().getDispatched().getCount() + + ", inflight: " + getDestinationStatistics().getInflight().getCount()); + } synchronized (consumers) { removeFromConsumerList(sub); if (sub.getConsumerInfo().isExclusive()) { @@ -552,10 +558,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { } private void expireMessages() { - LOG.info("expiring messages..."); - + if (LOG.isDebugEnabled()) { + LOG.debug("Expiring messages .."); + } + // just track the insertion count - List l = new AbstractList() { + List browsedMessages = new AbstractList() { int size = 0; @Override @@ -573,7 +581,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { return null; } }; - doBrowse(true, l, getMaxBrowsePageSize()); + doBrowse(true, browsedMessages, this.getMaxExpirePageSize()); } public void gc(){ @@ -750,7 +758,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { addAll(pagedInPendingDispatch, l, max, toExpire); for (MessageReference ref : toExpire) { pagedInPendingDispatch.remove(ref); - messageExpired(connectionContext, ref, false); + messageExpired(connectionContext, ref); } } toExpire.clear(); @@ -758,7 +766,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { addAll(pagedInMessages.values(), l, max, toExpire); } for (MessageReference ref : toExpire) { - messageExpired(connectionContext, ref, false); + messageExpired(connectionContext, ref); } if (l.size() < getMaxBrowsePageSize()) { @@ -771,7 +779,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (node != null) { if (broker.isExpired(node)) { messageExpired(connectionContext, - createMessageReference(node.getMessage()), false); + createMessageReference(node.getMessage())); } else if (l.contains(node.getMessage()) == false) { l.add(node.getMessage()); } @@ -1249,21 +1257,17 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - public void messageExpired(ConnectionContext context,MessageReference reference, boolean dispatched) { - messageExpired(context,null,reference, dispatched); + public void messageExpired(ConnectionContext context,MessageReference reference) { + messageExpired(context,null,reference); } public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { - messageExpired(context, subs, reference, true); - } - - public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference, boolean dispatched) { + if (LOG.isDebugEnabled()) { + LOG.debug("message expired: " + reference); + } broker.messageExpired(context, reference); destinationStatistics.getDequeues().increment(); destinationStatistics.getExpired().increment(); - if (dispatched) { - destinationStatistics.getInflight().decrement(); - } try { removeMessage(context,subs,(QueueMessageReference)reference); } catch (IOException e) { @@ -1349,7 +1353,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { result.add(ref); count++; } else { - messageExpired(createConnectionContext(), ref, false); + messageExpired(createConnectionContext(), ref); } } } finally { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 953b240401..e859db5c04 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -69,6 +69,7 @@ import org.apache.commons.logging.LogFactory; * @version $Revision$ */ public class RegionBroker extends EmptyBroker { + public static final String ORIGINAL_EXPIRATION = "originalExpiration"; private static final Log LOG = LogFactory.getLog(RegionBroker.class); private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); @@ -689,7 +690,7 @@ public class RegionBroker extends EmptyBroker { } long expiration=message.getExpiration(); message.setExpiration(0); - message.setProperty("originalExpiration",new Long( + message.setProperty(ORIGINAL_EXPIRATION,new Long( expiration)); if(!message.isPersistent()){ message.setPersistent(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index c8789da661..749ee181b1 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -277,7 +277,7 @@ public class DurableConsumerTest extends TestCase { executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages); - assertTrue(exceptions.isEmpty()); + assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); } public void testConsumerRecover() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java b/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java index ec8cb2c1e6..43e584eed4 100755 --- a/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java @@ -354,12 +354,7 @@ public class ActiveMQMapMessageTest extends TestCase { mapMessage.onSend(); mapMessage.setContent(mapMessage.getContent()); - try { - mapMessage.getString("String"); - fail("Should throw a Null pointer"); - }catch(NullPointerException e){ - - } + assertNull(mapMessage.getString("String")); mapMessage.clearBody(); mapMessage.setString("String", "String"); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 89063ee2d5..2e981c8b44 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -32,14 +32,13 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - - public class ExpiredMessagesTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class); @@ -60,15 +59,22 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setBrokerName("localhost"); - broker.setDataDirectory("data/"); - broker.setUseJmx(true); - broker.deleteAllMessages(); - broker.addConnector("tcp://localhost:61616"); - broker.start(); - broker.waitUntilStarted(); - } + broker = new BrokerService(); + broker.setBrokerName("localhost"); + broker.setDataDirectory("data/"); + broker.setUseJmx(true); + broker.deleteAllMessages(); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setExpireMessagesPeriod(100); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + broker.setDestinationPolicy(policyMap); + + broker.addConnector("tcp://localhost:61616"); + broker.start(); + broker.waitUntilStarted(); + } public void testExpiredMessages() throws Exception { @@ -93,6 +99,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { Thread.sleep(100); end = System.currentTimeMillis(); } + consumer.close(); } catch (Throwable ex) { ex.printStackTrace(); } @@ -109,6 +116,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { while (i++ < 30000) { producer.send(session.createTextMessage("test")); } + producer.close(); } catch (Throwable ex) { ex.printStackTrace(); } @@ -119,14 +127,23 @@ public class ExpiredMessagesTest extends CombinationTestSupport { consumerThread.join(); producingThread.join(); + session.close(); + Thread.sleep(5000); DestinationViewMBean view = createView(destination); LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount()); - //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount()); + + long expiry = System.currentTimeMillis() + 30000; + while (view.getInFlightCount() > 0 && System.currentTimeMillis() < expiry) { + Thread.sleep(500); + } + LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount()); } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { @@ -146,7 +163,4 @@ public class ExpiredMessagesTest extends CombinationTestSupport { broker.stop(); broker.waitUntilStopped(); } - - - } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 428ad64cbc..d26c39d005 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -16,7 +16,13 @@ */ package org.apache.activemq.usecases; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.MBeanServer; @@ -57,31 +63,45 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { junit.textui.TestRunner.run(suite()); } - protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setBrokerName("localhost"); - broker.setDataDirectory("data/"); - broker.setUseJmx(true); - broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:61616"); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setExpireMessagesPeriod(expiryPeriod); - defaultEntry.setMaxExpirePageSize(200); - // so memory is not consumed by DLQ turn if off - defaultEntry.setDeadLetterStrategy(null); - defaultEntry.setMemoryLimit(200*1000); - policyMap.setDefaultEntry(defaultEntry); + protected void createBrokerWithMemoryLimit() throws Exception { + doCreateBroker(true); + } + + protected void createBroker() throws Exception { + doCreateBroker(false); + } + + private void doCreateBroker(boolean memoryLimit) throws Exception { + broker = new BrokerService(); + broker.setBrokerName("localhost"); + broker.setDataDirectory("data/"); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://localhost:61616"); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(expiryPeriod); + defaultEntry.setMaxExpirePageSize(200); + + if (memoryLimit) { + // so memory is not consumed by DLQ turn if off + defaultEntry.setDeadLetterStrategy(null); + defaultEntry.setMemoryLimit(200 * 1000); + } + + policyMap.setDefaultEntry(defaultEntry); broker.setDestinationPolicy(policyMap); + + broker.start(); + + broker.waitUntilStarted(); + } - broker.start(); - - broker.waitUntilStarted(); - } - - public void testExpiredMessages() throws Exception { + public void testExpiredMessagesWithNoConsumer() throws Exception { + createBrokerWithMemoryLimit(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -121,7 +141,89 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { DestinationViewMBean view = createView(destination); assertEquals("All sent have expired ", sendCount, view.getExpiredCount()); } + + + public void testExpiredMessagesWitVerySlowConsumer() throws Exception { + createBroker(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = factory.createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + producer = session.createProducer(destination); + final int ttl = 4000; + producer.setTimeToLive(ttl); + + final long sendCount = 1001; + final CountDownLatch receivedOneCondition = new CountDownLatch(1); + final CountDownLatch waitCondition = new CountDownLatch(1); + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + try { + LOG.info("Got my message: " + message); + receivedOneCondition.countDown(); + waitCondition.await(60, TimeUnit.SECONDS); + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + connection.start(); + + + Thread producingThread = new Thread("Producing Thread") { + public void run() { + try { + int i = 0; + long tStamp = System.currentTimeMillis(); + while (i++ < sendCount) { + producer.send(session.createTextMessage("test")); + if (i%100 == 0) { + LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); + tStamp = System.currentTimeMillis() ; + } + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + + final long expiry = System.currentTimeMillis() + 20*1000; + while (producingThread.isAlive() && expiry > System.currentTimeMillis()) { + producingThread.join(1000); + } + + assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS)); + assertTrue("producer completed within time ", !producingThread.isAlive()); + + Thread.sleep(2 * Math.max(ttl, expiryPeriod)); + DestinationViewMBean view = createView(destination); + + assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount()); + assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount()); + + + // let the ack happen + waitCondition.countDown(); + + Thread.sleep(Math.max(ttl, expiryPeriod)); + + assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount()); + + assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount()); + + consumer.close(); + LOG.info("done: " + getName()); + } + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); String domain = "org.apache.activemq";