mirror of https://github.com/apache/activemq.git
Add fix for https://issues.apache.org/jira/browse/AMQ-3153 and add user supplied unit test.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1074289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d245457ccd
commit
a5fc6dd999
|
@ -27,6 +27,7 @@ import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.broker.region.Destination;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
import org.apache.activemq.broker.region.MessageReference;
|
||||||
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.usage.MemoryUsage;
|
import org.apache.activemq.usage.MemoryUsage;
|
||||||
import org.apache.activemq.util.ByteArrayInputStream;
|
import org.apache.activemq.util.ByteArrayInputStream;
|
||||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||||
|
@ -36,9 +37,9 @@ import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an ActiveMQ message
|
* Represents an ActiveMQ message
|
||||||
*
|
*
|
||||||
* @openwire:marshaller
|
* @openwire:marshaller
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
|
public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
|
||||||
|
|
||||||
|
@ -122,6 +123,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
|
|
||||||
if (properties != null) {
|
if (properties != null) {
|
||||||
copy.properties = new HashMap<String, Object>(properties);
|
copy.properties = new HashMap<String, Object>(properties);
|
||||||
|
|
||||||
|
// The new message hasn't expired, so remove this feild.
|
||||||
|
copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
|
||||||
} else {
|
} else {
|
||||||
copy.properties = properties;
|
copy.properties = properties;
|
||||||
}
|
}
|
||||||
|
@ -177,7 +181,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
lazyCreateProperties();
|
lazyCreateProperties();
|
||||||
properties.put(name, value);
|
properties.put(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeProperty(String name) throws IOException {
|
public void removeProperty(String name) throws IOException {
|
||||||
lazyCreateProperties();
|
lazyCreateProperties();
|
||||||
properties.remove(name);
|
properties.remove(name);
|
||||||
|
@ -438,7 +442,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
* consumer id is an active consumer on the broker, the message is dropped.
|
* consumer id is an active consumer on the broker, the message is dropped.
|
||||||
* Used by the AdvisoryBroker to replay advisory messages to a specific
|
* Used by the AdvisoryBroker to replay advisory messages to a specific
|
||||||
* consumer.
|
* consumer.
|
||||||
*
|
*
|
||||||
* @openwire:property version=1 cache=true
|
* @openwire:property version=1 cache=true
|
||||||
*/
|
*/
|
||||||
public ConsumerId getTargetConsumerId() {
|
public ConsumerId getTargetConsumerId() {
|
||||||
|
@ -502,7 +506,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The route of brokers the command has moved through.
|
* The route of brokers the command has moved through.
|
||||||
*
|
*
|
||||||
* @openwire:property version=1 cache=true
|
* @openwire:property version=1 cache=true
|
||||||
*/
|
*/
|
||||||
public BrokerId[] getBrokerPath() {
|
public BrokerId[] getBrokerPath() {
|
||||||
|
@ -541,7 +545,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
* Used to schedule the arrival time of a message to a broker. The broker
|
* Used to schedule the arrival time of a message to a broker. The broker
|
||||||
* will not dispatch a message to a consumer until it's arrival time has
|
* will not dispatch a message to a consumer until it's arrival time has
|
||||||
* elapsed.
|
* elapsed.
|
||||||
*
|
*
|
||||||
* @openwire:property version=1
|
* @openwire:property version=1
|
||||||
*/
|
*/
|
||||||
public long getArrival() {
|
public long getArrival() {
|
||||||
|
@ -556,7 +560,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
* Only set by the broker and defines the userID of the producer connection
|
* Only set by the broker and defines the userID of the producer connection
|
||||||
* who sent this message. This is an optional field, it needs to be enabled
|
* who sent this message. This is an optional field, it needs to be enabled
|
||||||
* on the broker to have this field populated.
|
* on the broker to have this field populated.
|
||||||
*
|
*
|
||||||
* @openwire:property version=1
|
* @openwire:property version=1
|
||||||
*/
|
*/
|
||||||
public String getUserID() {
|
public String getUserID() {
|
||||||
|
@ -589,11 +593,11 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
this.memoryUsage=regionDestination.getMemoryUsage();
|
this.memoryUsage=regionDestination.getMemoryUsage();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public MemoryUsage getMemoryUsage() {
|
public MemoryUsage getMemoryUsage() {
|
||||||
return this.memoryUsage;
|
return this.memoryUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMemoryUsage(MemoryUsage usage) {
|
public void setMemoryUsage(MemoryUsage usage) {
|
||||||
this.memoryUsage=usage;
|
this.memoryUsage=usage;
|
||||||
}
|
}
|
||||||
|
@ -614,7 +618,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
if (rc == 1 && getMemoryUsage() != null) {
|
if (rc == 1 && getMemoryUsage() != null) {
|
||||||
getMemoryUsage().increaseUsage(size);
|
getMemoryUsage().increaseUsage(size);
|
||||||
//System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
|
//System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
|
//System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
|
||||||
|
@ -634,7 +638,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
//Thread.dumpStack();
|
//Thread.dumpStack();
|
||||||
//System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
|
//System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
|
||||||
}
|
}
|
||||||
|
|
||||||
//System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
|
//System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
|
@ -653,7 +657,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
}
|
}
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getMinimumMessageSize() {
|
protected int getMinimumMessageSize() {
|
||||||
int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
|
int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
|
||||||
//let destination override
|
//let destination override
|
||||||
|
@ -697,7 +701,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
/**
|
/**
|
||||||
* If a message is stored in multiple nodes on a cluster, all the cluster
|
* If a message is stored in multiple nodes on a cluster, all the cluster
|
||||||
* members will be listed here. Otherwise, it will be null.
|
* members will be listed here. Otherwise, it will be null.
|
||||||
*
|
*
|
||||||
* @openwire:property version=3 cache=true
|
* @openwire:property version=3 cache=true
|
||||||
*/
|
*/
|
||||||
public BrokerId[] getCluster() {
|
public BrokerId[] getCluster() {
|
||||||
|
@ -734,16 +738,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
public void setBrokerOutTime(long brokerOutTime) {
|
public void setBrokerOutTime(long brokerOutTime) {
|
||||||
this.brokerOutTime = brokerOutTime;
|
this.brokerOutTime = brokerOutTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDropped() {
|
public boolean isDropped() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return toString(null);
|
return toString(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString(Map<String, Object>overrideFields) {
|
public String toString(Map<String, Object>overrideFields) {
|
||||||
try {
|
try {
|
||||||
|
@ -751,5 +755,5 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
}
|
}
|
||||||
return super.toString(overrideFields);
|
return super.toString(overrideFields);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.broker.region.Queue;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
|
||||||
|
|
||||||
|
private static final long MESSAGE_TTL_MILLIS = 1000;
|
||||||
|
private static final long MAX_TEST_TIME_MILLIS = 60000;
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
setAutoFail(true);
|
||||||
|
setMaxTestTime(MAX_TEST_TIME_MILLIS);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test verifies that a message that expires can be be resent to queue
|
||||||
|
* with a new expiration and that it will be processed as a new message and
|
||||||
|
* allowed to re-expire.
|
||||||
|
* <p>
|
||||||
|
* <b>NOTE:</b> This test fails on AMQ 5.4.2 because the originalExpiration
|
||||||
|
* timestamp is not cleared when the message is resent.
|
||||||
|
*/
|
||||||
|
public void testDoubleExpireWithoutMove() throws Exception {
|
||||||
|
// Create the default dead letter queue.
|
||||||
|
final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ");
|
||||||
|
|
||||||
|
Connection conn = createConnection();
|
||||||
|
try {
|
||||||
|
conn.start();
|
||||||
|
Session session = conn.createSession(false,
|
||||||
|
Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
// Verify that the test queue and DLQ are empty.
|
||||||
|
Assert.assertEquals(0, getSize(destination));
|
||||||
|
Assert.assertEquals(0, getSize(DLQ));
|
||||||
|
|
||||||
|
// Enqueue a message to the test queue that will expire after 1s.
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
Message testMessage = session.createTextMessage("test message");
|
||||||
|
producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE,
|
||||||
|
Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
|
||||||
|
Assert.assertEquals(1, getSize(destination));
|
||||||
|
|
||||||
|
// Wait for the message to expire.
|
||||||
|
waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
|
||||||
|
Assert.assertEquals(1, getSize(DLQ));
|
||||||
|
|
||||||
|
// Consume the message from the DLQ and re-enqueue it to the test
|
||||||
|
// queue so that it expires after 1s.
|
||||||
|
MessageConsumer consumer = session.createConsumer(DLQ);
|
||||||
|
Message expiredMessage = consumer.receive();
|
||||||
|
Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage
|
||||||
|
.getJMSMessageID());
|
||||||
|
|
||||||
|
producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE,
|
||||||
|
Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
|
||||||
|
Assert.assertEquals(1, getSize(destination));
|
||||||
|
Assert.assertEquals(0, getSize(DLQ));
|
||||||
|
|
||||||
|
// Verify that the resent message is "different" in that it has
|
||||||
|
// another ID.
|
||||||
|
Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage
|
||||||
|
.getJMSMessageID());
|
||||||
|
|
||||||
|
// Wait for the message to re-expire.
|
||||||
|
waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
|
||||||
|
Assert.assertEquals(1, getSize(DLQ));
|
||||||
|
|
||||||
|
// Re-consume the message from the DLQ.
|
||||||
|
Message reexpiredMessage = consumer.receive();
|
||||||
|
Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage
|
||||||
|
.getJMSMessageID());
|
||||||
|
} finally {
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper method that returns the embedded broker's implementation of a
|
||||||
|
* JMS queue.
|
||||||
|
*/
|
||||||
|
private Queue getPhysicalDestination(ActiveMQDestination destination)
|
||||||
|
throws Exception {
|
||||||
|
return (Queue) broker.getAdminView().getBroker().getDestinationMap()
|
||||||
|
.get(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper method that returns the size of the specified queue/topic.
|
||||||
|
*/
|
||||||
|
private long getSize(ActiveMQDestination destination) throws Exception {
|
||||||
|
return getPhysicalDestination(destination) != null ? getPhysicalDestination(
|
||||||
|
destination).getDestinationStatistics().getMessages()
|
||||||
|
.getCount()
|
||||||
|
: 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper method that waits for a destination to reach a certain size.
|
||||||
|
*/
|
||||||
|
private void waitForSize(ActiveMQDestination destination, int size,
|
||||||
|
long timeoutMillis) throws Exception, TimeoutException {
|
||||||
|
long startTimeMillis = System.currentTimeMillis();
|
||||||
|
|
||||||
|
while (getSize(destination) != size
|
||||||
|
&& System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) {
|
||||||
|
Thread.sleep(250);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getSize(destination) != size) {
|
||||||
|
throw new TimeoutException("Destination "
|
||||||
|
+ destination.getPhysicalName() + " did not reach size "
|
||||||
|
+ size + " within " + timeoutMillis + "ms.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue