diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 32055976e4..63120a9a1e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -377,6 +377,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ while(pending.hasNext()&&!isFull()){ MessageReference node=pending.next(); pending.remove(); + + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if( node.isExpired() ) { + continue; // just drop it. + } + dispatch(node); } }finally{ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 55354075e1..768583c54a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -17,7 +17,14 @@ */ package org.apache.activemq.broker.region; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -28,7 +35,6 @@ import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; -import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; @@ -51,14 +57,7 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; /** * The Queue is a List of MessageEntry objects that are dispatched to matching @@ -122,7 +121,13 @@ public class Queue implements Destination, Task { store.recover(new MessageRecoveryListener(){ public void recoverMessage(Message message){ - message.setRegionDestination(Queue.this); + // Message could have expired while it was being loaded.. + if( message.isExpired() ) { + // TODO: remove message from store. + return; + } + + message.setRegionDestination(Queue.this); synchronized(messages){ try{ messages.addMessageLast(message); @@ -295,11 +300,23 @@ public class Queue implements Destination, Task { } public void send(final ConnectionContext context,final Message message) throws Exception{ + // There is delay between the client sending it and it arriving at the + // destination.. it may have expired. + if( message.isExpired() ) { + return; + } + if(context.isProducerFlowControl()){ if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); }else{ usageManager.waitForSpace(); + + // The usage manager could have delayed us by the time + // we unblock the message could have expired.. + if( message.isExpired() ) { + return; + } } } message.setRegionDestination(this); @@ -310,6 +327,14 @@ public class Queue implements Destination, Task { context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ + + // It could take while before we receive the commit + // operration.. by that time the message could have expired.. + if( message.isExpired() ) { + // TODO: remove message from store. + return; + } + sendMessage(context,message); } }); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 025432a4e0..2337beb4c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -232,11 +232,23 @@ public class Topic implements Destination { public void send(final ConnectionContext context, final Message message) throws Exception { + // There is delay between the client sending it and it arriving at the + // destination.. it may have expired. + if( message.isExpired() ) { + return; + } + if (context.isProducerFlowControl()) { if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); } else { usageManager.waitForSpace(); + + // The usage manager could have delayed us by the time + // we unblock the message could have expired.. + if( message.isExpired() ) { + return; + } } } @@ -251,6 +263,12 @@ public class Topic implements Destination { if (context.isInTransaction()) { context.getTransaction().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { + // It could take while before we receive the commit + // operration.. by that time the message could have expired.. + if( message.isExpired() ) { + // TODO: remove message from store. + return; + } dispatch(context, message); } }); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 6aab11521b..779296af38 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -325,6 +325,14 @@ public class TopicSubscription extends AbstractSubscription{ for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ MessageReference message=(MessageReference) iter.next(); iter.remove(); + + // Message may have been sitting in the matched list a while + // waiting for the consumer to ak the message. + if( message.isExpired() ) { + message.decrementReferenceCount(); + continue; // just drop it. + } + dispatch(message); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index 06d07a73bf..25c7b748cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -409,7 +409,10 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess } public boolean isExpired() { - // TODO: need to be implemented. + long expireTime = getExpiration(); + if (expireTime > 0 && System.currentTimeMillis() > expireTime) { + return true; + } return false; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java new file mode 100644 index 0000000000..3b048ecb42 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java @@ -0,0 +1,285 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + +public class MessageExpirationTest extends BrokerTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode; + public int prefetch; + public byte destinationType; + public boolean durableConsumer; + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode, int timeToLive) { + Message message = createMessage(producerInfo, destination, deliveryMode); + long now = System.currentTimeMillis(); + message.setTimestamp(now); + message.setExpiration(now+timeToLive); + return message; + } + + + public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() { + addCombinationValues( "deliveryMode", new Object[]{ + new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)} ); + addCombinationValues( "destinationType", new Object[]{ + new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), + new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE), + new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE), + } ); + } + + public void testMessagesWaitingForUssageDecreaseExpire() throws Exception { + + // Start a producer + final StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + final ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + // Start a consumer.. + final StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + destination = createDestinationInfo(connection2, connectionInfo2, destinationType); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(1); + connection2.send(consumerInfo2); + + // Reduce the limit so that only 1 message can flow through the broker at a time. + broker.getMemoryManager().setLimit(1); + + final Message m1 = createMessage(producerInfo, destination, deliveryMode); + final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000); + final Message m3 = createMessage(producerInfo, destination, deliveryMode); + final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000); + + // Produce in an async thread since the producer will be getting blocked by the usage manager.. + new Thread() { + public void run() { + // m1 and m3 should not expire.. but the others should. + try { + connection.send(m1); + connection.send(m2); + connection.send(m3); + connection.send(m4); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + + + // Make sure only 1 message was delivered due to prefetch == 1 + Message m = receiveMessage(connection2); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + assertNoMessagesLeft(connection); + + // Sleep before we ack so that the messages expire on the usage manager + Thread.sleep(1500); + connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd message sent. + m = receiveMessage(connection2); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + + // Sleep before we ack so that the messages expire on the usage manager + Thread.sleep(1500); + connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection2); + + connection.send(closeConnectionInfo(connectionInfo)); + connection.send(closeConnectionInfo(connectionInfo2)); + } + + + public void initCombosForTestMessagesInLongTransactionExpire() { + addCombinationValues( "deliveryMode", new Object[]{ + new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)} ); + addCombinationValues( "destinationType", new Object[]{ + new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE), + new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), + new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) + } ); + } + + public void testMessagesInLongTransactionExpire() throws Exception { + + // Start a producer and consumer + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + destination = createDestinationInfo(connection, connectionInfo, destinationType); + + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1000); + connection.send(consumerInfo); + + // Start the tx.. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + // m1 and m3 should not expire.. but the others should. + Message m1 = createMessage(producerInfo, destination, deliveryMode); + m1.setTransactionId(txid); + connection.send(m1); + Message m = createMessage(producerInfo, destination, deliveryMode, 1000); + m.setTransactionId(txid); + connection.send(m); + Message m3 = createMessage(producerInfo, destination, deliveryMode); + m3.setTransactionId(txid); + connection.send(m3); + m = createMessage(producerInfo, destination, deliveryMode, 1000); + m.setTransactionId(txid); + connection.send(m); + + // Sleep before we commit so that the messages expire on the commit list.. + Thread.sleep(1500); + connection.send(createCommitTransaction1Phase(connectionInfo, txid)); + + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd message sent. + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection); + + connection.send(closeConnectionInfo(connectionInfo)); + } + + + public void TestMessagesInSubscriptionPendingListExpire() { + addCombinationValues( "deliveryMode", new Object[]{ + new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)} ); + addCombinationValues( "destinationType", new Object[]{ + new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE), + new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), + new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) + } ); + } + + public void initCombosForTestMessagesInSubscriptionPendingListExpire() { + addCombinationValues( "deliveryMode", new Object[]{ + new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)} ); + addCombinationValues( "destinationType", new Object[]{ + new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE), + new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), + new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) + } ); + } + + public void testMessagesInSubscriptionPendingListExpire() throws Exception { + + // Start a producer and consumer + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + destination = createDestinationInfo(connection, connectionInfo, destinationType); + + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1); + connection.send(consumerInfo); + + // m1 and m3 should not expire.. but the others should. + Message m1 = createMessage(producerInfo, destination, deliveryMode); + connection.send(m1); + connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); + Message m3 = createMessage(producerInfo, destination, deliveryMode); + connection.send(m3); + connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); + + // Make sure only 1 message was delivered due to prefetch == 1 + Message m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + assertNoMessagesLeft(connection); + + // Sleep before we ack so that the messages expire on the pending list.. + Thread.sleep(1500); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd message sent. + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection); + + connection.send(closeConnectionInfo(connectionInfo)); + } + + public static Test suite() { + return suite(MessageExpirationTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +}