From 27b3a7c3440e3e04fae1a1eb766ed13733c27949 Mon Sep 17 00:00:00 2001 From: gtully Date: Mon, 9 Jun 2014 12:31:39 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5212 - ensure uncontented access to concurrent destination map avoides deadlock, rerework https://issues.apache.org/jira/browse/AMQ-4952 to differenciate duplicates from send and recovered messages from the store. https://issues.apache.org/jira/browse/AMQ-3454 benefits from getDestinationMap(destination) to get direct access to the map to determine existance. Additional test --- .../org/apache/activemq/broker/Broker.java | 8 + .../apache/activemq/broker/BrokerFilter.java | 5 + .../apache/activemq/broker/EmptyBroker.java | 5 + .../apache/activemq/broker/ErrorBroker.java | 5 + .../activemq/broker/MutableBrokerFilter.java | 5 + .../broker/region/AbstractRegion.java | 7 +- .../activemq/broker/region/RegionBroker.java | 9 + .../region/cursors/AbstractStoreCursor.java | 13 +- .../security/AuthorizationBroker.java | 2 +- .../activemq/store/jdbc/JDBCMessageStore.java | 7 +- .../activemq/store/kahadb/KahaDBStore.java | 5 +- .../org/apache/activemq/bugs/AMQ5212Test.java | 204 ++++++++++++++++++ 12 files changed, 258 insertions(+), 17 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index c2fd132dcf..5d052e913e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker; import java.net.URI; +import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; @@ -143,6 +144,13 @@ public interface Broker extends Region, Service { */ ActiveMQDestination[] getDestinations() throws Exception; + /** + * return a reference destination map of a region based on the destination type + * @param destination + * @return + */ + public Map getDestinationMap(ActiveMQDestination destination); + /** * Gets a list of all the prepared xa transactions. * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index b1d3c1877e..132b46dc4e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -72,6 +72,11 @@ public class BrokerFilter implements Broker { return next.getDestinationMap(); } + @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return next.getDestinationMap(destination); + } + @Override public Set getDestinations(ActiveMQDestination destination) { return next.getDestinations(destination); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 2d2e6ba7ab..8185554ac4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -77,6 +77,11 @@ public class EmptyBroker implements Broker { return Collections.EMPTY_MAP; } + @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return Collections.EMPTY_MAP; + } + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index f692d8a2c9..ae42141e95 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -66,6 +66,11 @@ public class ErrorBroker implements Broker { return Collections.EMPTY_MAP; } + @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return Collections.EMPTY_MAP; + } + @Override public Set getDestinations(ActiveMQDestination destination) { return Collections.EMPTY_SET; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 112378a84f..2eea2e840b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -82,6 +82,11 @@ public class MutableBrokerFilter implements Broker { return getNext().getDestinationMap(); } + @Override + public Map getDestinationMap(ActiveMQDestination destination) { + return getNext().getDestinationMap(destination); + } + @Override public Set getDestinations(ActiveMQDestination destination) { return getNext().getDestinations(destination); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index e443d53985..53e8cdd341 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -249,12 +249,7 @@ public abstract class AbstractRegion implements Region { } public Map getDestinationMap() { - destinationsLock.readLock().lock(); - try{ - return destinations; - } finally { - destinationsLock.readLock().unlock(); - } + return destinations; } @SuppressWarnings("unchecked") diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index da6e1fadb3..59b1b92d29 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -134,6 +134,15 @@ public class RegionBroker extends EmptyBroker { return answer; } + @Override + public Map getDestinationMap(ActiveMQDestination destination) { + try { + return getRegion(destination).getDestinationMap(); + } catch (JMSException jmse) { + return Collections.emptyMap(); + } + } + @Override public Set getDestinations(ActiveMQDestination destination) { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index d0b1a39cab..b6f9b7e157 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -100,8 +100,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i recovered = true; storeHasMessages = true; } else { - LOG.warn("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(), message.getPriority() }); - duplicate(message); + if (LOG.isDebugEnabled()) { + LOG.debug(this + " - cursor got duplicate: " + message.getMessageId() + "," + message.getPriority() + ", cached=" + cached, new Throwable("duplicate message detected")); + } else { + LOG.warn("{} - cursor got duplicate {}", regionDestination.getActiveMQDestination(), message.getMessageId()); + } + if (!cached || message.getMessageId().getEntryLocator() != null) { + // came from the store or was added to the jdbc store + duplicate(message); + } } return recovered; } @@ -195,8 +202,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i lastCachedId = node.getMessageId(); lastTx = node.getMessage().getTransactionId(); } else { - LOG.debug(this + " duplicate add {}", node.getMessage(), new Throwable("duplicated detected")); dealWithDuplicates(); + return; } } } else { diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index db482baefb..39d3c5960b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -76,7 +76,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB } protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { - Destination existing = this.getDestinationMap().get(destination); + Destination existing = this.getDestinationMap(destination).get(destination); if (existing != null) { return true; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 3c441b0bcb..968b928a53 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -131,11 +131,8 @@ public class JDBCMessageStore extends AbstractMessageStore { } finally { c.close(); } - if (context != null && context.getXid() != null) { - message.getMessageId().setEntryLocator(sequenceId); - } else { - onAdd(messageId, sequenceId, message.getPriority()); - } + message.getMessageId().setEntryLocator(sequenceId); + onAdd(messageId, sequenceId, message.getPriority()); } @Override diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 74425f1a45..60c0738d04 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -281,8 +281,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (brokerService != null) { RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); if (regionBroker != null) { - Set destinationSet = regionBroker.getDestinations(convert(commandDestination)); - for (Destination destination : destinationSet) { + ActiveMQDestination activeMQDestination = convert(commandDestination); + Destination destination = regionBroker.getDestinationMap(activeMQDestination).get(activeMQDestination); + if (destination != null) { destination.getDestinationStatistics().getMessages().decrement(); destination.getDestinationStatistics().getEnqueues().decrement(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java new file mode 100644 index 0000000000..64d57a5342 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ5212Test { + + BrokerService brokerService; + + @Before + public void setUp() throws Exception { + start(true); + } + + public void start(boolean deleteAllMessages) throws Exception { + brokerService = new BrokerService(); + if (deleteAllMessages) { + brokerService.deleteAllMessages(); + } + brokerService.addConnector("tcp://localhost:0"); + brokerService.setAdvisorySupport(false); + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test + public void verifyDuplicateSuppressionWithConsumer() throws Exception { + doVerifyDuplicateSuppression(100, 100, true); + } + + @Test + public void verifyDuplicateSuppression() throws Exception { + doVerifyDuplicateSuppression(100, 100, false); + } + + public void doVerifyDuplicateSuppression(final int numToSend, final int expectedTotalEnqueue, final boolean demand) throws Exception { + final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + final int concurrency = 40; + final AtomicInteger workCount = new AtomicInteger(numToSend); + ExecutorService executorService = Executors.newFixedThreadPool(concurrency); + for (int i = 0; i < concurrency; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + int i; + while ((i = workCount.getAndDecrement()) > 0) { + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + + AMQ5212Test.class.getSimpleName()); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + if (demand) { + // create demand so page in will happen + activeMQSession.createConsumer(dest); + } + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + activeMQConnection.close(); + + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + TimeUnit.SECONDS.sleep(1); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + + assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount()); + } + + @Test + public void verifyConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + activeMQConnection.close(); + + // verify original can be consumed after restart + brokerService.stop(); + brokerService.start(false); + + connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + + activeMQConnection.close(); + } + + @Test + public void verifyClientAckConsumptionOnDuplicate() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setWatchTopicAdvisories(false); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.start(); + ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQQueue dest = new ActiveMQQueue("Q"); + + MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); + + ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setDestination(dest); + activeMQMessageProducer.send(message, null); + + // send a duplicate + activeMQConnection.syncSendPacket(message); + + + Message received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + messageConsumer.close(); + + + messageConsumer = activeMQSession.createConsumer(dest); + received = messageConsumer.receive(4000); + assertNotNull("Got message", received); + assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); + received.acknowledge(); + + activeMQConnection.close(); + } +}