diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java index f3f3b78544..df3b6587e8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java @@ -31,24 +31,24 @@ import org.apache.activemq.state.ProducerState; */ public final class BrokerSupport { - private BrokerSupport() { + private BrokerSupport() { } - + public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { doResend(context, originalMessage, deadLetterDestination, false); } - + /** * @param context - * @param originalMessage + * @param originalMessage * @param deadLetterDestination * @throws Exception */ public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { doResend(context, originalMessage, deadLetterDestination, true); } - - public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { + + public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { Message message = copy ? originalMessage.copy() : originalMessage; message.setOriginalDestination(message.getDestination()); message.setOriginalTransactionId(message.getTransactionId()); @@ -56,6 +56,7 @@ public final class BrokerSupport { message.setTransactionId(null); message.setMemoryUsage(null); message.setRedeliveryCounter(0); + message.getMessageId().setDataLocator(null); boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java new file mode 100644 index 0000000000..049d68365b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Once the wire format is completed we can test against real persistence storage. + */ +public class AMQ6059Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class); + + private BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testDLQRecovery() throws Exception { + + sendMessage(new ActiveMQQueue("leveldbQueue")); + TimeUnit.SECONDS.sleep(3); + + LOG.info("### Check for expired message moving to DLQ."); + + Queue dlqQueue = (Queue) createDlqDestination(); + verifyIsDlq(dlqQueue); + + final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQueue.getQueueName()); + + assertTrue("The message expired", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}", + new Object[] { queueViewMBean.getEnqueueCount(), + queueViewMBean.getDispatchCount(), + queueViewMBean.getExpiredCount(), + queueViewMBean.getInFlightCount()}); + return queueViewMBean.getEnqueueCount() == 1; + } + })); + + verifyMessageIsRecovered(dlqQueue); + restartBroker(broker); + verifyMessageIsRecovered(dlqQueue); + } + + protected BrokerService createBroker() throws Exception { + return createBrokerWithDLQ(true); + } + + private BrokerService createBrokerWithDLQ(boolean purge) throws Exception { + BrokerService broker = new BrokerService(); + + File directory = new File("target/activemq-data/leveldb"); + if (purge) { + IOHelper.deleteChildren(directory); + } + + LevelDBStore levelDBStore = new LevelDBStore(); + levelDBStore.setDirectory(directory); + if (purge) { + levelDBStore.deleteAllMessages(); + } + + PolicyMap pMap = new PolicyMap(); + + SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy(); + sharedDLQStrategy.setProcessNonPersistent(true); + sharedDLQStrategy.setProcessExpired(true); + sharedDLQStrategy.setDeadLetterQueue(new ActiveMQQueue("ActiveMQ.DLQ")); + sharedDLQStrategy.setExpiration(10000); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setDeadLetterStrategy(sharedDLQStrategy); + defaultPolicy.setExpireMessagesPeriod(2000); + defaultPolicy.setUseCache(false); + + pMap.put(new ActiveMQQueue(">"), defaultPolicy); + broker.setDestinationPolicy(pMap); + broker.setPersistenceAdapter(levelDBStore); + if (purge) { + broker.setDeleteAllMessagesOnStartup(true); + } + + return broker; + } + + private void restartBroker(BrokerService broker) throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createBrokerWithDLQ(false); + broker.start(); + broker.waitUntilStarted(); + } + + private void verifyMessageIsRecovered(final Queue dlqQueue) throws Exception, JMSException { + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(dlqQueue); + Enumeration elements = browser.getEnumeration(); + assertTrue(elements.hasMoreElements()); + Message browsed = (Message) elements.nextElement(); + assertNotNull("Recover message after broker restarts", browsed); + } + + private void sendMessage(Destination destination) throws Exception { + Connection connection = createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.send(destination, session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000); + connection.stop(); + LOG.info("### Send message that will expire."); + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + return factory.createConnection(); + } + + private Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + + private void verifyIsDlq(Queue dlqQ) throws Exception { + final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName()); + assertTrue("is dlq", queueViewMBean.isDLQ()); + } + + private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + name); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +}