https://issues.apache.org/jira/browse/AMQ-3542 - further simplification of network bridge start, remove latch wait states, add duplex test variant

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1185765 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-18 17:31:27 +00:00
parent 970a97ac5f
commit 174c40579a
6 changed files with 69 additions and 146 deletions

View File

@ -534,7 +534,7 @@ public class BrokerService implements Service {
getBroker().brokerServiceStarted(); getBroker().brokerServiceStarted();
startedLatch.countDown(); startedLatch.countDown();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
try { try {
if (!stopped.get()) { if (!stopped.get()) {
stop(); stop();

View File

@ -18,14 +18,8 @@ package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,71 +34,13 @@ import org.slf4j.LoggerFactory;
public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class); private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class);
protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
protected Object brokerInfoMutex = new Object();
public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) { Transport remoteBroker) {
super(configuration, localBroker, remoteBroker); super(configuration, localBroker, remoteBroker);
remoteBrokerName = remoteBroker.toString(); remoteBrokerName = remoteBroker.toString();
remoteBrokerNameKnownLatch.countDown();
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId();
// lets associate the incoming endpoint with a broker ID so we can
// refer to it later
Endpoint from = command.getFrom();
if (from == null) {
LOG.warn("Incoming command does not have a from endpoint: " + command);
} else {
from.setBrokerInfo(remoteBrokerInfo);
}
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
LOG.info("Disconnecting loop back connection.");
// waitStarted();
ServiceSupport.dispose(this);
}
}
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info)));
}
/**
* Returns the broker ID that the command came from
*/
protected BrokerId getFromBrokerId(Command command) throws IOException {
BrokerId answer = null;
Endpoint from = command.getFrom();
if (from == null) {
LOG.warn("Incoming command does not have a from endpoint: " + command);
} else {
answer = from.getBrokerId();
}
if (answer != null) {
return answer;
} else {
throw new IOException("No broker ID is available for endpoint: " + from + " from command: "
+ command);
}
} }
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
// TODO is there much we can do here? // TODO is there much we can do here?
} }
protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
} }

View File

@ -16,15 +16,7 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.io.IOException;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,58 +30,8 @@ import org.slf4j.LoggerFactory;
public class DemandForwardingBridge extends DemandForwardingBridgeSupport { public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class); private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class);
protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) { Transport remoteBroker) {
super(configuration, localBroker, remoteBroker); super(configuration, localBroker, remoteBroker);
} }
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection: " + remoteBrokerName);
}
ServiceSupport.dispose(this);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("counting down remoteBrokerNameKnownLatch with: " + command);
}
remoteBrokerNameKnownLatch.countDown();
}
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
}
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
synchronized (brokerInfoMutex) {
localBrokerId = ((BrokerInfo)command).getBrokerId();
localBrokerPath[0] = localBrokerId;
localBrokerIdKnownLatch.countDown();
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection.");
}
waitStarted();
ServiceSupport.dispose(this);
}
}
}
}
protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
} }

View File

@ -36,7 +36,6 @@ import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -74,7 +73,6 @@ import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.*; import org.apache.activemq.util.*;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -114,12 +112,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2); protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected CountDownLatch localStartedLatch = new CountDownLatch(1);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration; protected NetworkBridgeConfiguration configuration;
protected NetworkBridgeFilterFactory filterFactory; protected NetworkBridgeFilterFactory filterFactory;
protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
final AtomicLong enqueueCounter = new AtomicLong(); final AtomicLong enqueueCounter = new AtomicLong();
final AtomicLong dequeueCounter = new AtomicLong(); final AtomicLong dequeueCounter = new AtomicLong();
@ -222,14 +222,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}); });
} }
protected void startLocalBridge() throws Throwable { private void startLocalBridge() throws Throwable {
if (localBridgeStarted.compareAndSet(false, true)) { if (localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) { synchronized (this) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker); LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
} }
remoteBrokerNameKnownLatch.await();
if (!disposed.get()) { if (!disposed.get()) {
localConnectionInfo = new ConnectionInfo(); localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
@ -277,7 +275,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void startRemoteBridge() throws Exception { protected void startRemoteBridge() throws Exception {
if (remoteBridgeStarted.compareAndSet(false, true)) { if (remoteBridgeStarted.compareAndSet(false, true)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker); LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
} }
synchronized (this) { synchronized (this) {
if (!isCreatedByDuplex()) { if (!isCreatedByDuplex()) {
@ -291,7 +289,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
IntrospectionSupport.getProperties(configuration, props, null); IntrospectionSupport.getProperties(configuration, props, null);
String str = MarshallingSupport.propertiesToString(props); String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str); brokerInfo.setNetworkProperties(str);
localBrokerIdKnownLatch.await();
brokerInfo.setBrokerId(this.localBrokerId); brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo); remoteBroker.oneway(brokerInfo);
} }
@ -322,9 +319,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo); remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown(); startedLatch.countDown();
if (!disposed.get()) {
triggerLocalStartBridge();
}
} }
} }
} }
@ -372,7 +366,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
remoteBrokerNameKnownLatch.countDown();
} }
} }
@ -1161,7 +1154,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void waitStarted() throws InterruptedException { protected void waitStarted() throws InterruptedException {
startedLatch.await(); startedLatch.await();
localBrokerIdKnownLatch.await();
} }
protected void clearDownSubscriptions() { protected void clearDownSubscriptions() {
@ -1184,13 +1176,47 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL() ); return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL() );
} }
protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
synchronized (brokerInfoMutex) {
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
}
waitStarted();
ServiceSupport.dispose(this);
}
}
}
}
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
}
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
}
ServiceSupport.dispose(this);
}
}
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
}
protected abstract BrokerId[] getRemoteBrokerPath(); protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath;
}
public void setNetworkBridgeListener(NetworkBridgeListener listener) { public void setNetworkBridgeListener(NetworkBridgeListener listener) {
this.networkBridgeListener = listener; this.networkBridgeListener = listener;
@ -1233,6 +1259,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void setBrokerService(BrokerService brokerService) { public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService; this.brokerService = brokerService;
this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
localBrokerPath[0] = localBrokerId;
} }
public void setMbeanObjectName(ObjectName objectName) { public void setMbeanObjectName(ObjectName objectName) {

View File

@ -96,10 +96,16 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return; return;
} }
} }
if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) { if (localURI.equals(uri)) {
LOG.debug("not connecting loopback: " + uri); LOG.debug("not connecting loopback: " + uri);
return; return;
} }
if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
LOG.debug("connectionFilter disallows connection to: " + uri);
return;
}
URI connectUri = uri; URI connectUri = uri;
try { try {
connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);

View File

@ -278,7 +278,6 @@ public class FailoverStaticNetworkTest {
brokerC.start(); brokerC.start();
assertTrue("all props applied a second time", networkConnectorProps.isEmpty()); assertTrue("all props applied a second time", networkConnectorProps.isEmpty());
//Thread.sleep(4000);
doTestNetworkSendReceive(brokerC, brokerB); doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC); doTestNetworkSendReceive(brokerB, brokerC);
@ -321,8 +320,20 @@ public class FailoverStaticNetworkTest {
@Test @Test
public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception { public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
}
brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}); @Test
public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws Exception {
HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
networkConnectorProps.put("duplex", "true");
doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps);
}
public void doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String> networkConnectorProps) throws Exception {
brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}, networkConnectorProps);
brokerB.start(); brokerB.start();
final AtomicBoolean done = new AtomicBoolean(false); final AtomicBoolean done = new AtomicBoolean(false);
@ -380,7 +391,7 @@ public class FailoverStaticNetworkTest {
} }
}); });
for (int i=0; i<10; i++) { for (int i=0; i<4; i++) {
BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1); BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1);
LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName")); LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
currentMaster.waitUntilStarted(); currentMaster.waitUntilStarted();
@ -392,7 +403,7 @@ public class FailoverStaticNetworkTest {
currentMaster.waitUntilStopped(); currentMaster.waitUntilStopped();
} }
done.set(false); done.set(true);
LOG.info("all done"); LOG.info("all done");
executorService.shutdownNow(); executorService.shutdownNow();
} }