[AMQ-6640] allign use of sync vm transport usage on duplex end of networkconnector with initiator end. only duplexinbound for forwarding is async to allow thread for responses. vm transport options applied in one place and test more deterministic w.r.t the hang scenario

This commit is contained in:
gtully 2017-03-29 23:50:47 +01:00
parent 770a73e9ba
commit d84a58656c
8 changed files with 35 additions and 30 deletions

View File

@ -2617,12 +2617,6 @@ public class BrokerService implements Service {
setTransportConnectors(al);
}
this.slave = false;
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<>(URISupport.parseParameters(uri));
map.put("async", "false");
map.put("create","false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
if (!stopped.get()) {
ThreadPoolExecutor networkConnectorStartExecutor = null;
if (isNetworkConnectorStartAsync()) {
@ -2642,7 +2636,7 @@ public class BrokerService implements Service {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
final NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setLocalUri(getVmConnectorURI());
startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor);
}
if (networkConnectorStartExecutor != null) {

View File

@ -1428,7 +1428,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
}
Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker.getVmConnectorURI());
Transport remoteBridgeTransport = transport;
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
// the vm transport case is already wrapped

View File

@ -193,7 +193,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
if (isDuplex()) {
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
@Override
@ -830,9 +830,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
demandConsumerDispatched++;
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
(configuration.getAdvisoryAckPercentage() / 100f))) {
MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
ack.setConsumerId(demandConsumerInfo.getConsumerId());
remoteBroker.oneway(ack);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
remoteBroker.oneway(ack);
} catch (IOException e) {
LOG.warn("Failed to send advisory ack " + ack, e);
}
}
});
demandConsumerDispatched = 0;
}
}
@ -1039,12 +1048,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) {
// async vm transport on duplex end, need to wait for completion
localBroker.request(sub.getLocalInfo());
} else {
localBroker.oneway(sub.getLocalInfo());
}
localBroker.oneway(sub.getLocalInfo());
}
}

View File

@ -58,10 +58,17 @@ public final class NetworkBridgeFactory {
return result;
}
public static Transport createLocalTransport(Broker broker) throws Exception {
URI uri = broker.getVmConnectorURI();
public static Transport createLocalTransport(URI uri) throws Exception {
return createLocalTransport(uri, false);
}
public static Transport createLocalAsyncTransport(URI uri) throws Exception {
return createLocalTransport(uri, true);
}
private static Transport createLocalTransport(URI uri, boolean async) throws Exception {
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("async", "true");
map.put("async", String.valueOf(async));
map.put("create", "false"); // we don't want a vm connect during shutdown to trigger a broker create
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
return TransportFactory.connect(uri);

View File

@ -140,7 +140,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
}
protected Transport createLocalTransport() throws Exception {
return TransportFactory.connect(localURI);
return NetworkBridgeFactory.createLocalTransport(localURI);
}
public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) {

View File

@ -63,7 +63,7 @@ public class NetworkRouteTest {
@Test
public void verifyNoRemoveOnOneConduitRemove() throws Exception {
EasyMock.expect(localBroker.request(EasyMock.isA(ConsumerInfo.class))).andReturn(null);
localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
control.replay();
remoteListener.onCommand(path2Msg);
@ -76,7 +76,7 @@ public class NetworkRouteTest {
@Test
public void addAndRemoveOppositeOrder() throws Exception {
// from (1)
localBroker.request(EasyMock.isA(ConsumerInfo.class));
localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
// from (2a)
remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
@ -123,7 +123,7 @@ public class NetworkRouteTest {
@Test
public void addAndRemoveSameOrder() throws Exception {
// from (1)
localBroker.request(EasyMock.isA(ConsumerInfo.class));
localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
// from (2a)

View File

@ -126,9 +126,9 @@ public class DuplexAdvisoryRaceTest {
+ "?jms.watchTopicAdvisories=false");
// populate dests
final int numDests = 200;
final int numMessagesPerDest = 300;
final int numConsumersPerDest = 100;
final int numDests = 800;
final int numMessagesPerDest = 50;
final int numConsumersPerDest = 5;
populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
@ -148,7 +148,7 @@ public class DuplexAdvisoryRaceTest {
LOG.info("received: " + responseReceived.get());
return responseReceived.get() >= numMessagesPerDest * numDests;
}
}, 2*60*1000)) {
}, 5*60*1000)) {
org.apache.activemq.TestSupport.dumpAllThreads("DD");
@ -177,7 +177,6 @@ public class DuplexAdvisoryRaceTest {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final BytesMessage message = session.createBytesMessage();
//message.writeBytes(new byte[50]);
MessageProducer producer = session.createProducer(null);;
for (int i=minDest; i<maxDest; i++) {
Destination destination = qFromInt(i);
@ -236,7 +235,7 @@ public class DuplexAdvisoryRaceTest {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024)?maxReconnectAttempts=0)";
String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());

View File

@ -24,6 +24,7 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.transport.TransportLogger.Connection=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.store.kahadb=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG