[AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression which should only be necessary in ring topologies when properly configured

This commit is contained in:
gtully 2017-03-28 14:49:23 +01:00
parent 0196be1d27
commit 8e00c6c2bc
3 changed files with 272 additions and 8 deletions

View File

@ -785,7 +785,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
addConsumerInfo((ConsumerInfo) command);
final ConsumerInfo consumerInfo = (ConsumerInfo) command;
if (isDuplicateSuppressionOff(consumerInfo)) {
addConsumerInfo(consumerInfo);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(consumerInfo);
}
}
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
@ -867,8 +874,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronize on a shared entity
synchronized (brokerService.getVmConnectorURI()) {
// if duplicate suppression is required
if (isDuplicateSuppressionOff(info)) {
addConsumerInfo(info);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(info);
}
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations
@ -1027,8 +1039,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
if (isDuplex()) {
// async vm transport, need to wait for completion
if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) {
// async vm transport on duplex end, need to wait for completion
localBroker.request(sub.getLocalInfo());
} else {
localBroker.oneway(sub.getLocalInfo());
@ -1332,8 +1344,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
&& !configuration.isSuppressDuplicateTopicSubscriptions()) {
if (isDuplicateSuppressionOff(consumerInfo)) {
return suppress;
}
@ -1355,6 +1366,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return suppress;
}
private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
|| consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
|| consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
}
private boolean isInActiveDurableSub(Subscription sub) {
return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
}

View File

@ -496,10 +496,10 @@ public class AMQ3274Test {
if (queue_f) {
prefix = "queue";
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
} else {
prefix = "topic";
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
}
excludes = new ArrayList<ActiveMQDestination>();

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertTrue;
// https://issues.apache.org/jira/browse/AMQ-6640
public class DuplexAdvisoryRaceTest {
private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class);
private static String hostName;
final AtomicLong responseReceived = new AtomicLong(0);
BrokerService brokerA,brokerB;
String networkConnectorUrlString;
@BeforeClass
public static void initIp() throws Exception {
// attempt to bypass loopback - not vital but it helps to reproduce
hostName = InetAddress.getLocalHost().getHostAddress();
}
@Before
public void createBrokers() throws Exception {
networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort();
brokerA = newBroker("A");
brokerB = newBroker("B");
responseReceived.set(0);
}
@After
public void stopBrokers() throws Exception {
brokerA.stop();
brokerB.stop();
}
// to be sure to be sure
public void repeatTestHang() throws Exception {
for (int i=0; i<10;i++) {
testHang();
stopBrokers();
createBrokers();
}
}
@Test
public void testHang() throws Exception {
brokerA.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription subscription = super.addConsumer(context, info);
// delay return to allow dispatch to interleave
if (context.isNetworkConnection()) {
TimeUnit.MILLISECONDS.sleep(300);
}
return subscription;
};
}});
// bridge
NetworkConnector networkConnector = bridgeBrokers(brokerA, brokerB);
brokerA.start();
brokerB.start();
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+ "?jms.watchTopicAdvisories=false");
ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+ "?jms.watchTopicAdvisories=false");
// populate dests
final int numDests = 200;
final int numMessagesPerDest = 300;
final int numConsumersPerDest = 100;
populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
// demand
List<Connection> connections = new LinkedList<>();
connections.add(demand(brokerBFactory, 0, numDests/2, numConsumersPerDest));
connections.add(demand(brokerAFactory, numDests/2, numDests, numConsumersPerDest));
LOG.info("Allow duplex bridge to connect....");
// allow bridge to start
brokerB.startTransportConnector(brokerB.addConnector(networkConnectorUrlString + "?transport.socketBufferSize=1024"));
if (!Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("received: " + responseReceived.get());
return responseReceived.get() >= numMessagesPerDest * numDests;
}
}, 2*60*1000)) {
org.apache.activemq.TestSupport.dumpAllThreads("DD");
// when hung close will also hang!
for (NetworkBridge networkBridge : networkConnector.activeBridges()) {
if (networkBridge instanceof DemandForwardingBridge) {
DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge) networkBridge;
Socket socket = demandForwardingBridge.getRemoteBroker().narrow(Socket.class);
socket.close();
}
}
}
networkConnector.stop();
for (Connection connection: connections) {
try {
connection.close();
} catch (Exception ignored) {}
}
assertTrue("received all sent: " + responseReceived.get(), responseReceived.get() >= numMessagesPerDest * numDests);
}
private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final BytesMessage message = session.createBytesMessage();
//message.writeBytes(new byte[50]);
MessageProducer producer = session.createProducer(null);;
for (int i=minDest; i<maxDest; i++) {
Destination destination = qFromInt(i);
for (int j=0; j<numMessages; j++) {
producer.send(destination, message);
}
}
connection.close();
}
private Connection demand(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numConsumers) throws Exception {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i=minDest; i<maxDest; i++) {
Destination destination = qFromInt(i);
for (int j=0; j<numConsumers; j++) {
session.createConsumer(destination).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
responseReceived.incrementAndGet();
}
});
}
}
connection.start();
return connection;
}
private Destination qFromInt(int val) {
StringBuilder builder = new StringBuilder();
String digits = String.format("%03d", val);
for (int i=0; i<3; i++) {
builder.append(digits.charAt(i));
if (i < 2) {
builder.append('.');
}
}
return new ActiveMQQueue("Test." + builder.toString());
}
private BrokerService newBroker(String name) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.setBrokerName(name);
brokerService.addConnector("tcp://" + hostName + ":0?transport.socketBufferSize=1024");
PolicyMap map = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(0);
map.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(map);
return brokerService;
}
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024)?maxReconnectAttempts=0)";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());
connector.setDuplex(true);
localBroker.addNetworkConnector(connector);
return connector;
}
}