Reduces async work that's don't during network bridge startup. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-30 17:15:05 +00:00
parent a8b6f8bb45
commit f1eb92eaac
1 changed files with 266 additions and 157 deletions

View File

@ -25,9 +25,12 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -112,7 +115,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected int demandConsumerDispatched;
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected AtomicBoolean disposed = new AtomicBoolean();
protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
protected final AtomicBoolean disposed = new AtomicBoolean();
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
@ -128,7 +132,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
final AtomicLong enqueueCounter = new AtomicLong();
@ -139,6 +142,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
@ -197,9 +203,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
@Override
public void onException(IOException error) {
if (!futureLocalBrokerInfo.isDone()) {
futureLocalBrokerInfo.cancel(true);
return;
}
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new DefaultTransportListener() {
@Override
@ -210,16 +221,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
@Override
public void onException(IOException error) {
if (!futureRemoteBrokerInfo.isDone()) {
futureRemoteBrokerInfo.cancel(true);
return;
}
serviceRemoteException(error);
}
});
localBroker.start();
remoteBroker.start();
localBroker.start();
if (!disposed.get()) {
try {
triggerRemoteStartBridge();
triggerStartAsyncNetworkBridgeCreation();
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
@ -230,16 +245,92 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected void triggerLocalStartBridge() throws IOException {
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
}
futureRemoteBrokerInfo.cancel(true);
futureLocalBrokerInfo.cancel(true);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
try {
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
serialExecutor.shutdown();
if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
if (LOG.isInfoEnabled()) {
LOG.info("pending tasks on stop" + pendingTasks);
}
}
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception sending shutdown", e);
}
} finally {
sendShutdown.countDown();
}
}
}, "ActiveMQ ForwardingBridge StopTask");
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely manner");
}
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(remoteBroker);
ss.stop(localBroker);
ss.stop(duplexInboundLocalBroker);
// Release the started Latch since another thread could be
// stuck waiting for it to start up.
startedLatch.countDown();
startedLatch.countDown();
localStartedLatch.countDown();
ss.throwFirstException();
}
}
if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
if (LOG.isInfoEnabled()) {
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
}
}
}
protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
"remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
try {
startLocalBridge();
} catch (Throwable e) {
serviceLocalException(e);
// First we collect the info data from both the local and remote ends
collectBrokerInfos();
// Once we have all required broker info we can attempt to start
// the local and then remote sides of the bridge.
doStartLocalAndRemoteBridges();
} finally {
Thread.currentThread().setName(originalName);
}
@ -247,21 +338,90 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
});
}
protected void triggerRemoteStartBridge() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
try {
startRemoteBridge();
} catch (Exception e) {
serviceRemoteException(e);
} finally {
Thread.currentThread().setName(originalName);
}
private void collectBrokerInfos() {
// First wait for the remote to feed us its BrokerInfo, then we can check on
// the LocalBrokerInfo and decide is this is a loop.
try {
remoteBrokerInfo = futureRemoteBrokerInfo.get();
if (remoteBrokerInfo == null) {
fireBridgeFailed();
}
});
} catch (Exception e) {
serviceRemoteException(e);
return;
}
try {
localBrokerInfo = futureLocalBrokerInfo.get();
if (localBrokerInfo == null) {
fireBridgeFailed();
}
// Before we try and build the bridge lets check if we are in a loop
// and if so just stop now before registering anything.
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() +
" disconnecting remote loop back connection for: " +
remoteBrokerName + ", with id:" + remoteBrokerId);
}
ServiceSupport.dispose(localBroker);
ServiceSupport.dispose(remoteBroker);
return;
}
// Fill in the remote broker's information now.
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
} catch (Throwable e) {
serviceLocalException(e);
}
}
private void doStartLocalAndRemoteBridges() {
try {
startLocalBridge();
} catch (Throwable e) {
serviceLocalException(e);
return;
}
try {
if (disposed.get()) {
return;
}
Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
try {
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote destinations", t);
}
// Let the local broker know the remote broker's ID.
localBroker.oneway(remoteBrokerInfo);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
}
}
private void startLocalBridge() throws Throwable {
@ -334,8 +494,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localStartedLatch.countDown();
}
safeWaitUntilStarted();
if (!disposed.get()) {
setupStaticDestinations();
} else {
@ -399,73 +557,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
}
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
try {
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
serialExecutor.shutdown();
if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
if (LOG.isInfoEnabled()) {
LOG.info("pending tasks on stop" + pendingTasks);
}
}
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception sending shutdown", e);
}
} finally {
sendShutdown.countDown();
}
}
}, "ActiveMQ ForwardingBridge StopTask");
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely manner");
}
} finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(remoteBroker);
ss.stop(localBroker);
ss.stop(duplexInboundLocalBroker);
// Release the started Latch since another thread could be
// stuck waiting for it to start up.
startedLatch.countDown();
startedLatch.countDown();
localStartedLatch.countDown();
ss.throwFirstException();
}
}
if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
if (LOG.isInfoEnabled()) {
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
}
}
}
}
@Override
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
@ -494,31 +585,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
remoteBrokerInfo = (BrokerInfo) command;
Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
try {
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
LOG.error("Error mapping remote destinations", t);
}
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
// new peer broker (a consumer can work with remote broker also)
brokerService.getBroker().addBroker(null, remoteBrokerInfo);
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
@ -944,8 +1011,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
} else if (command.isBrokerInfo()) {
localBrokerInfo = (BrokerInfo) command;
serviceLocalBrokerInfo(command);
futureLocalBrokerInfo.set((BrokerInfo) command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
stop();
@ -967,42 +1033,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
synchronized (brokerInfoMutex) {
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:"
+ remoteBrokerId);
}
safeWaitUntilStarted();
ServiceSupport.dispose(this);
}
}
}
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:"
+ remoteBrokerId);
}
ServiceSupport.dispose(this);
}
}
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
}
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
boolean suppress = false;
// for durable subs, suppression via filter leaves dangling acks so we
@ -1387,7 +1417,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private void fireBridgeFailed() {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
l.bridgeFailed();
}
}
@ -1535,4 +1565,83 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public ObjectName getMbeanObjectName() {
return mbeanObjectName;
}
/*
* Used to allow for async tasks to await receipt of the BrokerInfo from the local and
* remote sides of the network bridge.
*/
private static class FutureBrokerInfo implements Future<BrokerInfo> {
private final CountDownLatch slot = new CountDownLatch(1);
private final AtomicBoolean disposed;
private BrokerInfo info = null;
public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
this.info = info;
this.disposed = disposed;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
slot.countDown();
return true;
}
@Override
public boolean isCancelled() {
return slot.getCount() == 0 && info == null;
}
@Override
public boolean isDone() {
return info != null;
}
@Override
public BrokerInfo get() throws InterruptedException, ExecutionException {
try {
if (info == null) {
while (!disposed.get()) {
if (slot.await(1, TimeUnit.SECONDS)) {
break;
}
}
}
return info;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) {
LOG.debug("Operation interupted: " + e, e);
}
throw new InterruptedException("Interrupted.");
}
}
@Override
public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
if (info == null) {
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
while (!disposed.get() || System.currentTimeMillis() < deadline) {
if (slot.await(1, TimeUnit.MILLISECONDS)) {
break;
}
}
if (info == null) {
throw new TimeoutException();
}
}
return info;
} catch (InterruptedException e) {
throw new InterruptedException("Interrupted.");
}
}
public void set(BrokerInfo info) {
this.info = info;
this.slot.countDown();
}
}
}