diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index f7dc745bd7..459501cdfb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -785,7 +785,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br case ConsumerInfo.DATA_STRUCTURE_TYPE: localStartedLatch.await(); if (started.get()) { - addConsumerInfo((ConsumerInfo) command); + final ConsumerInfo consumerInfo = (ConsumerInfo) command; + if (isDuplicateSuppressionOff(consumerInfo)) { + addConsumerInfo(consumerInfo); + } else { + synchronized (brokerService.getVmConnectorURI()) { + addConsumerInfo(consumerInfo); + } + } } else { // received a subscription whilst stopping LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); @@ -867,8 +874,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // in a cyclic network there can be multiple bridges per broker that can propagate // a network subscription so there is a need to synchronize on a shared entity - synchronized (brokerService.getVmConnectorURI()) { + // if duplicate suppression is required + if (isDuplicateSuppressionOff(info)) { addConsumerInfo(info); + } else { + synchronized (brokerService.getVmConnectorURI()) { + addConsumerInfo(info); + } } } else if (data.getClass() == DestinationInfo.class) { // It's a destination info - we want to pass up information about temporary destinations @@ -1027,8 +1039,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void addSubscription(DemandSubscription sub) throws IOException { if (sub != null) { - if (isDuplex()) { - // async vm transport, need to wait for completion + if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) { + // async vm transport on duplex end, need to wait for completion localBroker.request(sub.getLocalInfo()); } else { localBroker.oneway(sub.getLocalInfo()); @@ -1332,8 +1344,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); boolean suppress = false; - if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() - && !configuration.isSuppressDuplicateTopicSubscriptions()) { + if (isDuplicateSuppressionOff(consumerInfo)) { return suppress; } @@ -1355,6 +1366,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } + private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) { + return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions() + || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() + || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions(); + } + private boolean isInActiveDurableSub(Subscription sub) { return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java index 48c5cbb937..f901d3d7eb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java @@ -496,10 +496,10 @@ public class AMQ3274Test { if (queue_f) { prefix = "queue"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); } else { prefix = "topic"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); } excludes = new ArrayList(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java new file mode 100644 index 0000000000..9919ec9a73 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkBridge; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.TestUtils; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.net.InetAddress; +import java.net.Socket; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertTrue; + +// https://issues.apache.org/jira/browse/AMQ-6640 +public class DuplexAdvisoryRaceTest { + private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class); + private static String hostName; + + final AtomicLong responseReceived = new AtomicLong(0); + + BrokerService brokerA,brokerB; + String networkConnectorUrlString; + + @BeforeClass + public static void initIp() throws Exception { + // attempt to bypass loopback - not vital but it helps to reproduce + hostName = InetAddress.getLocalHost().getHostAddress(); + } + + @Before + public void createBrokers() throws Exception { + networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort(); + + brokerA = newBroker("A"); + brokerB = newBroker("B"); + responseReceived.set(0); + } + + @After + public void stopBrokers() throws Exception { + brokerA.stop(); + brokerB.stop(); + } + + + // to be sure to be sure + public void repeatTestHang() throws Exception { + for (int i=0; i<10;i++) { + testHang(); + stopBrokers(); + createBrokers(); + } + } + + @Test + public void testHang() throws Exception { + + brokerA.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { + @Override + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + Subscription subscription = super.addConsumer(context, info); + // delay return to allow dispatch to interleave + if (context.isNetworkConnection()) { + TimeUnit.MILLISECONDS.sleep(300); + } + return subscription; + }; + }}); + + // bridge + NetworkConnector networkConnector = bridgeBrokers(brokerA, brokerB); + + brokerA.start(); + brokerB.start(); + + ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + "?jms.watchTopicAdvisories=false"); + + ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + "?jms.watchTopicAdvisories=false"); + + // populate dests + final int numDests = 200; + final int numMessagesPerDest = 300; + final int numConsumersPerDest = 100; + populate(brokerAFactory, 0, numDests/2, numMessagesPerDest); + populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest); + + // demand + List connections = new LinkedList<>(); + connections.add(demand(brokerBFactory, 0, numDests/2, numConsumersPerDest)); + connections.add(demand(brokerAFactory, numDests/2, numDests, numConsumersPerDest)); + + + LOG.info("Allow duplex bridge to connect...."); + // allow bridge to start + brokerB.startTransportConnector(brokerB.addConnector(networkConnectorUrlString + "?transport.socketBufferSize=1024")); + + if (!Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("received: " + responseReceived.get()); + return responseReceived.get() >= numMessagesPerDest * numDests; + } + }, 2*60*1000)) { + + org.apache.activemq.TestSupport.dumpAllThreads("DD"); + + // when hung close will also hang! + for (NetworkBridge networkBridge : networkConnector.activeBridges()) { + if (networkBridge instanceof DemandForwardingBridge) { + DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge) networkBridge; + Socket socket = demandForwardingBridge.getRemoteBroker().narrow(Socket.class); + socket.close(); + } + } + } + + networkConnector.stop(); + for (Connection connection: connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + assertTrue("received all sent: " + responseReceived.get(), responseReceived.get() >= numMessagesPerDest * numDests); + } + + + private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final BytesMessage message = session.createBytesMessage(); + //message.writeBytes(new byte[50]); + MessageProducer producer = session.createProducer(null);; + for (int i=minDest; i