diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index 4a31c86748..774c36de81 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -27,6 +27,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; @@ -36,9 +37,9 @@ import org.apache.activemq.wireformat.WireFormat; /** * Represents an ActiveMQ message - * + * * @openwire:marshaller - * + * */ public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { @@ -122,6 +123,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess if (properties != null) { copy.properties = new HashMap(properties); + + // The new message hasn't expired, so remove this feild. + copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION); } else { copy.properties = properties; } @@ -177,7 +181,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess lazyCreateProperties(); properties.put(name, value); } - + public void removeProperty(String name) throws IOException { lazyCreateProperties(); properties.remove(name); @@ -438,7 +442,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess * consumer id is an active consumer on the broker, the message is dropped. * Used by the AdvisoryBroker to replay advisory messages to a specific * consumer. - * + * * @openwire:property version=1 cache=true */ public ConsumerId getTargetConsumerId() { @@ -502,7 +506,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess /** * The route of brokers the command has moved through. - * + * * @openwire:property version=1 cache=true */ public BrokerId[] getBrokerPath() { @@ -541,7 +545,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess * Used to schedule the arrival time of a message to a broker. The broker * will not dispatch a message to a consumer until it's arrival time has * elapsed. - * + * * @openwire:property version=1 */ public long getArrival() { @@ -556,7 +560,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess * Only set by the broker and defines the userID of the producer connection * who sent this message. This is an optional field, it needs to be enabled * on the broker to have this field populated. - * + * * @openwire:property version=1 */ public String getUserID() { @@ -589,11 +593,11 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess this.memoryUsage=regionDestination.getMemoryUsage(); } } - + public MemoryUsage getMemoryUsage() { return this.memoryUsage; } - + public void setMemoryUsage(MemoryUsage usage) { this.memoryUsage=usage; } @@ -614,7 +618,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess if (rc == 1 && getMemoryUsage() != null) { getMemoryUsage().increaseUsage(size); //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); - + } //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); @@ -634,7 +638,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess //Thread.dumpStack(); //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); } - + //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); return rc; @@ -653,7 +657,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess } return size; } - + protected int getMinimumMessageSize() { int result = DEFAULT_MINIMUM_MESSAGE_SIZE; //let destination override @@ -697,7 +701,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess /** * If a message is stored in multiple nodes on a cluster, all the cluster * members will be listed here. Otherwise, it will be null. - * + * * @openwire:property version=3 cache=true */ public BrokerId[] getCluster() { @@ -734,16 +738,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess public void setBrokerOutTime(long brokerOutTime) { this.brokerOutTime = brokerOutTime; } - + public boolean isDropped() { return false; } - + @Override public String toString() { return toString(null); } - + @Override public String toString(MapoverrideFields) { try { @@ -751,5 +755,5 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess } catch (IOException e) { } return super.toString(overrideFields); - } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java new file mode 100644 index 0000000000..bb669434e2 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.concurrent.TimeoutException; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.Assert; + +public class DoubleExpireTest extends EmbeddedBrokerTestSupport { + + private static final long MESSAGE_TTL_MILLIS = 1000; + private static final long MAX_TEST_TIME_MILLIS = 60000; + + public void setUp() throws Exception { + setAutoFail(true); + setMaxTestTime(MAX_TEST_TIME_MILLIS); + super.setUp(); + } + + /** + * This test verifies that a message that expires can be be resent to queue + * with a new expiration and that it will be processed as a new message and + * allowed to re-expire. + *

+ * NOTE: This test fails on AMQ 5.4.2 because the originalExpiration + * timestamp is not cleared when the message is resent. + */ + public void testDoubleExpireWithoutMove() throws Exception { + // Create the default dead letter queue. + final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ"); + + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Verify that the test queue and DLQ are empty. + Assert.assertEquals(0, getSize(destination)); + Assert.assertEquals(0, getSize(DLQ)); + + // Enqueue a message to the test queue that will expire after 1s. + MessageProducer producer = session.createProducer(destination); + Message testMessage = session.createTextMessage("test message"); + producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, + Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); + Assert.assertEquals(1, getSize(destination)); + + // Wait for the message to expire. + waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); + Assert.assertEquals(1, getSize(DLQ)); + + // Consume the message from the DLQ and re-enqueue it to the test + // queue so that it expires after 1s. + MessageConsumer consumer = session.createConsumer(DLQ); + Message expiredMessage = consumer.receive(); + Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage + .getJMSMessageID()); + + producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, + Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS); + Assert.assertEquals(1, getSize(destination)); + Assert.assertEquals(0, getSize(DLQ)); + + // Verify that the resent message is "different" in that it has + // another ID. + Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage + .getJMSMessageID()); + + // Wait for the message to re-expire. + waitForSize(destination, 0, MAX_TEST_TIME_MILLIS); + Assert.assertEquals(1, getSize(DLQ)); + + // Re-consume the message from the DLQ. + Message reexpiredMessage = consumer.receive(); + Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage + .getJMSMessageID()); + } finally { + conn.close(); + } + } + + /** + * A helper method that returns the embedded broker's implementation of a + * JMS queue. + */ + private Queue getPhysicalDestination(ActiveMQDestination destination) + throws Exception { + return (Queue) broker.getAdminView().getBroker().getDestinationMap() + .get(destination); + } + + /** + * A helper method that returns the size of the specified queue/topic. + */ + private long getSize(ActiveMQDestination destination) throws Exception { + return getPhysicalDestination(destination) != null ? getPhysicalDestination( + destination).getDestinationStatistics().getMessages() + .getCount() + : 0; + } + + /** + * A helper method that waits for a destination to reach a certain size. + */ + private void waitForSize(ActiveMQDestination destination, int size, + long timeoutMillis) throws Exception, TimeoutException { + long startTimeMillis = System.currentTimeMillis(); + + while (getSize(destination) != size + && System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) { + Thread.sleep(250); + } + + if (getSize(destination) != size) { + throw new TimeoutException("Destination " + + destination.getPhysicalName() + " did not reach size " + + size + " within " + timeoutMillis + "ms."); + } + } +}