From 66cfc7bab3dfa2e079bbc5276312c97ab02cae4f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 11 Jan 2016 10:57:38 -0500 Subject: [PATCH] AMQ-6121 AMQ-6122 Prevent messages on DLQ for looping back onto the same DLQ. Prevents expired messages from bouncing back and duplicate messages from the store from causing a deadlock. --- .../activemq/broker/region/RegionBroker.java | 8 +- .../broker/policy/DeadLetterTestSupport.java | 8 +- .../org/apache/activemq/bugs/AMQ6121Test.java | 149 +++++++++++++++++ .../org/apache/activemq/bugs/AMQ6122Test.java | 158 ++++++++++++++++++ 4 files changed, 319 insertions(+), 4 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6121Test.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6122Test.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index e975b4c014..961618de8b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -775,6 +775,13 @@ public class RegionBroker extends EmptyBroker { DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy(); if (deadLetterStrategy != null) { if (deadLetterStrategy.isSendToDeadLetterQueue(message)) { + ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription); + // Prevent a DLQ loop where same message is sent from a DLQ back to itself + if (deadLetterDestination.equals(message.getDestination())) { + LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); + return false; + } + // message may be inflight to other subscriptions so do not modify message = message.copy(); long dlqExpiration = deadLetterStrategy.getExpiration(); @@ -796,7 +803,6 @@ public class RegionBroker extends EmptyBroker { // not get filled when the message is first sent, // it is only populated if the message is routed to // another destination like the DLQ - ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription); ConnectionContext adminContext = context; if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) { adminContext = BrokerSupport.getConnectionContext(this); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index a0a8cc6278..8f9c3ba3ec 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -133,13 +133,15 @@ public abstract class DeadLetterTestSupport extends TestSupport { @Override public boolean isSatisified() throws Exception { - QueueViewMBean dlqView = null; + boolean satisfied = false; + try { - dlqView = getProxyToQueue(dlqQ.getQueueName()); + QueueViewMBean dlqView = getProxyToQueue(dlqQ.getQueueName()); + satisfied = dlqView != null ? dlqView.isDLQ() : false; } catch (Throwable error) { } - return dlqView != null ? dlqView.isDLQ() : false; + return satisfied; } })); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6121Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6121Test.java new file mode 100644 index 0000000000..736136706c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6121Test.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test to ensure DLQ expiring message is not in recursive loop. + */ +public class AMQ6121Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6121Test.class); + + private BrokerService broker; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + + LevelDBStore levelDBStore = new LevelDBStore(); + File directory = new File("target/activemq-data/myleveldb"); + IOHelper.deleteChildren(directory); + levelDBStore.setDirectory(directory); + levelDBStore.deleteAllMessages(); + + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + PolicyEntry pe = new PolicyEntry(); + + pe.setExpireMessagesPeriod(8000); + pe.setMaxAuditDepth(25); + pe.setUseCache(false); + pe.setLazyDispatch(false); + pe.setOptimizedDispatch(true); + pe.setProducerFlowControl(false); + pe.setEnableAudit(true); + + pe.setQueue(">"); + entries.add(pe); + + policyMap.setPolicyEntries(entries); + + broker.setDestinationPolicy(policyMap); + broker.setPersistenceAdapter(levelDBStore); + // broker.setPersistent(false); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + @Test(timeout = 30000) + public void sendToDLQ() throws Exception { + + final int MSG_COUNT = 50; + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("ActiveMQ.DLQ"); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage txtMessage = session.createTextMessage(); + txtMessage.setText("Test_Message"); + + // Exceed audit so that the entries beyond audit aren't detected as duplicate + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(txtMessage, DeliveryMode.PERSISTENT, 4, 1000l); + } + + final QueueViewMBean view = getProxyToQueue("ActiveMQ.DLQ"); + + LOG.info("WAITING for expiry..."); + + assertTrue("Queue drained of expired", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return view.getQueueSize() == 0; + } + })); + + LOG.info("FINISHED WAITING for expiry."); + + // check the enqueue counter + LOG.info("Queue enqueue counter ==>>>" + view.getEnqueueCount()); + assertEquals("Enqueue size ", MSG_COUNT, view.getEnqueueCount()); + + connection.close(); + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6122Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6122Test.java new file mode 100644 index 0000000000..220d7fe795 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6122Test.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.IOHelper; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ6122Test { + + private BrokerService brokerService; + private EmbeddedDataSource embeddedDataSource; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.deleteAllMessages(); + + // turn off the cache + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + PolicyEntry pe = new PolicyEntry(); + + pe.setUseCache(false); + pe.setExpireMessagesPeriod(0); + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + brokerService.setDestinationPolicy(policyMap); + + embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + embeddedDataSource.setCreateDatabase("create"); + embeddedDataSource.getConnection().close(); + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + jdbc.setDataSource(embeddedDataSource); + brokerService.setPersistenceAdapter(jdbc); + + jdbc.deleteAllMessages(); + + brokerService.addConnector("tcp://localhost:0"); + brokerService.setAdvisorySupport(false); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + if (embeddedDataSource != null) { + DataSourceServiceSupport.shutdownDefaultDataSource(embeddedDataSource); + } + } + + @Test + public void deadlockOnDuplicateInDLQ() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString() + + "?wireFormat.maxInactivityDuration=0"); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("ActiveMQ.DLQ"); + + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + // send another "real" message. block the send on the broker (use + // asyncSend to allow client to continue) + message.setCorrelationId("blockme"); + SendTask sendTask = new SendTask(activeMQMessageProducer, message); + new Thread(sendTask).start(); + + // create consumer to trigger fill batch (no cache) + // release the previous send. + ActiveMQConnection connectionForConsumer = (ActiveMQConnection) connectionFactory.createConnection(); + connectionForConsumer.start(); + ActiveMQSession sessionForConsumer = (ActiveMQSession) connectionForConsumer.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = sessionForConsumer.createConsumer(dest); + + Message received = messageConsumer.receive(); + assertNotNull("Got message", received); + messageConsumer.close(); + + activeMQConnection.close(); + } + + class SendTask implements Runnable { + + private final Message message; + private final ActiveMQMessageProducer producer; + + SendTask(ActiveMQMessageProducer producer, Message message) { + this.producer = producer; + this.message = message; + } + + @Override + public void run() { + try { + producer.send(message, null); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } +}