diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 17c5965d9f..50b0aedb86 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -25,9 +25,12 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -112,7 +115,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected int demandConsumerDispatched; protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); - protected AtomicBoolean disposed = new AtomicBoolean(); + protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); + protected final AtomicBoolean disposed = new AtomicBoolean(); protected BrokerId localBrokerId; protected ActiveMQDestination[] excludedDestinations; protected ActiveMQDestination[] dynamicallyIncludedDestinations; @@ -128,7 +132,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; - protected Object brokerInfoMutex = new Object(); protected BrokerId remoteBrokerId; final AtomicLong enqueueCounter = new AtomicLong(); @@ -139,6 +142,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private BrokerInfo localBrokerInfo; private BrokerInfo remoteBrokerInfo; + private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); + private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); + private final AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; @@ -197,9 +203,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br @Override public void onException(IOException error) { + if (!futureLocalBrokerInfo.isDone()) { + futureLocalBrokerInfo.cancel(true); + return; + } serviceLocalException(error); } }); + remoteBroker.setTransportListener(new DefaultTransportListener() { @Override @@ -210,16 +221,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br @Override public void onException(IOException error) { + if (!futureRemoteBrokerInfo.isDone()) { + futureRemoteBrokerInfo.cancel(true); + return; + } serviceRemoteException(error); } - }); - localBroker.start(); remoteBroker.start(); + localBroker.start(); + if (!disposed.get()) { try { - triggerRemoteStartBridge(); + triggerStartAsyncNetworkBridgeCreation(); } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } @@ -230,16 +245,92 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - protected void triggerLocalStartBridge() throws IOException { + @Override + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + if (disposed.compareAndSet(false, true)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); + } + + futureRemoteBrokerInfo.cancel(true); + futureLocalBrokerInfo.cancel(true); + + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null) { + l.onStop(this); + } + try { + remoteBridgeStarted.set(false); + final CountDownLatch sendShutdown = new CountDownLatch(1); + + brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override + public void run() { + try { + serialExecutor.shutdown(); + if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + List pendingTasks = serialExecutor.shutdownNow(); + if (LOG.isInfoEnabled()) { + LOG.info("pending tasks on stop" + pendingTasks); + } + } + localBroker.oneway(new ShutdownInfo()); + remoteBroker.oneway(new ShutdownInfo()); + } catch (Throwable e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception sending shutdown", e); + } + } finally { + sendShutdown.countDown(); + } + + } + }, "ActiveMQ ForwardingBridge StopTask"); + + if (!sendShutdown.await(10, TimeUnit.SECONDS)) { + LOG.info("Network Could not shutdown in a timely manner"); + } + } finally { + ServiceStopper ss = new ServiceStopper(); + ss.stop(remoteBroker); + ss.stop(localBroker); + ss.stop(duplexInboundLocalBroker); + // Release the started Latch since another thread could be + // stuck waiting for it to start up. + startedLatch.countDown(); + startedLatch.countDown(); + localStartedLatch.countDown(); + + ss.throwFirstException(); + } + } + + if (remoteBrokerInfo != null) { + brokerService.getBroker().removeBroker(null, remoteBrokerInfo); + brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); + if (LOG.isInfoEnabled()) { + LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); + } + } + } + } + + protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { brokerService.getTaskRunnerFactory().execute(new Runnable() { @Override public void run() { final String originalName = Thread.currentThread().getName(); - Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); + Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + + "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); + try { - startLocalBridge(); - } catch (Throwable e) { - serviceLocalException(e); + // First we collect the info data from both the local and remote ends + collectBrokerInfos(); + + // Once we have all required broker info we can attempt to start + // the local and then remote sides of the bridge. + doStartLocalAndRemoteBridges(); } finally { Thread.currentThread().setName(originalName); } @@ -247,21 +338,90 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }); } - protected void triggerRemoteStartBridge() throws IOException { - brokerService.getTaskRunnerFactory().execute(new Runnable() { - @Override - public void run() { - final String originalName = Thread.currentThread().getName(); - Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); - try { - startRemoteBridge(); - } catch (Exception e) { - serviceRemoteException(e); - } finally { - Thread.currentThread().setName(originalName); - } + private void collectBrokerInfos() { + + // First wait for the remote to feed us its BrokerInfo, then we can check on + // the LocalBrokerInfo and decide is this is a loop. + try { + remoteBrokerInfo = futureRemoteBrokerInfo.get(); + if (remoteBrokerInfo == null) { + fireBridgeFailed(); } - }); + } catch (Exception e) { + serviceRemoteException(e); + return; + } + + try { + localBrokerInfo = futureLocalBrokerInfo.get(); + if (localBrokerInfo == null) { + fireBridgeFailed(); + } + + // Before we try and build the bridge lets check if we are in a loop + // and if so just stop now before registering anything. + if (localBrokerId.equals(remoteBrokerId)) { + if (LOG.isTraceEnabled()) { + LOG.trace(configuration.getBrokerName() + + " disconnecting remote loop back connection for: " + + remoteBrokerName + ", with id:" + remoteBrokerId); + } + ServiceSupport.dispose(localBroker); + ServiceSupport.dispose(remoteBroker); + return; + } + + // Fill in the remote broker's information now. + remoteBrokerId = remoteBrokerInfo.getBrokerId(); + remoteBrokerPath[0] = remoteBrokerId; + remoteBrokerName = remoteBrokerInfo.getBrokerName(); + } catch (Throwable e) { + serviceLocalException(e); + } + } + + private void doStartLocalAndRemoteBridges() { + try { + startLocalBridge(); + } catch (Throwable e) { + serviceLocalException(e); + return; + } + + try { + + if (disposed.get()) { + return; + } + + Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); + try { + IntrospectionSupport.getProperties(configuration, props, null); + if (configuration.getExcludedDestinations() != null) { + excludedDestinations = configuration.getExcludedDestinations().toArray( + new ActiveMQDestination[configuration.getExcludedDestinations().size()]); + } + if (configuration.getStaticallyIncludedDestinations() != null) { + staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); + } + if (configuration.getDynamicallyIncludedDestinations() != null) { + dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); + } + } catch (Throwable t) { + LOG.error("Error mapping remote destinations", t); + } + + // Let the local broker know the remote broker's ID. + localBroker.oneway(remoteBrokerInfo); + // new peer broker (a consumer can work with remote broker also) + brokerService.getBroker().addBroker(null, remoteBrokerInfo); + + startRemoteBridge(); + } catch (Throwable e) { + serviceRemoteException(e); + } } private void startLocalBridge() throws Throwable { @@ -334,8 +494,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localStartedLatch.countDown(); } - safeWaitUntilStarted(); - if (!disposed.get()) { setupStaticDestinations(); } else { @@ -399,73 +557,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - @Override - public void stop() throws Exception { - if (started.compareAndSet(true, false)) { - if (disposed.compareAndSet(false, true)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); - } - NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { - l.onStop(this); - } - try { - remoteBridgeStarted.set(false); - final CountDownLatch sendShutdown = new CountDownLatch(1); - - brokerService.getTaskRunnerFactory().execute(new Runnable() { - @Override - public void run() { - try { - serialExecutor.shutdown(); - if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - List pendingTasks = serialExecutor.shutdownNow(); - if (LOG.isInfoEnabled()) { - LOG.info("pending tasks on stop" + pendingTasks); - } - } - localBroker.oneway(new ShutdownInfo()); - remoteBroker.oneway(new ShutdownInfo()); - } catch (Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception sending shutdown", e); - } - } finally { - sendShutdown.countDown(); - } - - } - }, "ActiveMQ ForwardingBridge StopTask"); - - if (!sendShutdown.await(10, TimeUnit.SECONDS)) { - LOG.info("Network Could not shutdown in a timely manner"); - } - } finally { - ServiceStopper ss = new ServiceStopper(); - ss.stop(remoteBroker); - ss.stop(localBroker); - ss.stop(duplexInboundLocalBroker); - // Release the started Latch since another thread could be - // stuck waiting for it to start up. - startedLatch.countDown(); - startedLatch.countDown(); - localStartedLatch.countDown(); - - ss.throwFirstException(); - } - } - - if (remoteBrokerInfo != null) { - brokerService.getBroker().removeBroker(null, remoteBrokerInfo); - brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); - if (LOG.isInfoEnabled()) { - LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); - } - } - } - } - @Override public void serviceRemoteException(Throwable error) { if (!disposed.get()) { @@ -494,31 +585,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); ackAdvisory(md.getMessage()); } else if (command.isBrokerInfo()) { - lastConnectSucceeded.set(true); - remoteBrokerInfo = (BrokerInfo) command; - Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); - try { - IntrospectionSupport.getProperties(configuration, props, null); - if (configuration.getExcludedDestinations() != null) { - excludedDestinations = configuration.getExcludedDestinations().toArray( - new ActiveMQDestination[configuration.getExcludedDestinations().size()]); - } - if (configuration.getStaticallyIncludedDestinations() != null) { - staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( - new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); - } - if (configuration.getDynamicallyIncludedDestinations() != null) { - dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( - new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); - } - } catch (Throwable t) { - LOG.error("Error mapping remote destinations", t); - } - serviceRemoteBrokerInfo(command); - // Let the local broker know the remote broker's ID. - localBroker.oneway(command); - // new peer broker (a consumer can work with remote broker also) - brokerService.getBroker().addBroker(null, remoteBrokerInfo); + futureRemoteBrokerInfo.set((BrokerInfo) command); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); @@ -944,8 +1011,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } else if (command.isBrokerInfo()) { - localBrokerInfo = (BrokerInfo) command; - serviceLocalBrokerInfo(command); + futureLocalBrokerInfo.set((BrokerInfo) command); } else if (command.isShutdownInfo()) { LOG.info(configuration.getBrokerName() + " Shutting down"); stop(); @@ -967,42 +1033,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { - synchronized (brokerInfoMutex) { - if (remoteBrokerId != null) { - if (remoteBrokerId.equals(localBrokerId)) { - if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" - + remoteBrokerId); - } - safeWaitUntilStarted(); - ServiceSupport.dispose(this); - } - } - } - } - - protected void serviceRemoteBrokerInfo(Command command) throws IOException { - synchronized (brokerInfoMutex) { - BrokerInfo remoteBrokerInfo = (BrokerInfo) command; - remoteBrokerId = remoteBrokerInfo.getBrokerId(); - remoteBrokerPath[0] = remoteBrokerId; - remoteBrokerName = remoteBrokerInfo.getBrokerName(); - if (localBrokerId != null) { - if (localBrokerId.equals(remoteBrokerId)) { - if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" - + remoteBrokerId); - } - ServiceSupport.dispose(this); - } - } - if (!disposed.get()) { - triggerLocalStartBridge(); - } - } - } - private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { boolean suppress = false; // for durable subs, suppression via filter leaves dangling acks so we @@ -1387,7 +1417,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private void fireBridgeFailed() { NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { + if (l != null && this.bridgeFailed.compareAndSet(false, true)) { l.bridgeFailed(); } } @@ -1535,4 +1565,83 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public ObjectName getMbeanObjectName() { return mbeanObjectName; } + + /* + * Used to allow for async tasks to await receipt of the BrokerInfo from the local and + * remote sides of the network bridge. + */ + private static class FutureBrokerInfo implements Future { + + private final CountDownLatch slot = new CountDownLatch(1); + private final AtomicBoolean disposed; + private BrokerInfo info = null; + + public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { + this.info = info; + this.disposed = disposed; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + slot.countDown(); + return true; + } + + @Override + public boolean isCancelled() { + return slot.getCount() == 0 && info == null; + } + + @Override + public boolean isDone() { + return info != null; + } + + @Override + public BrokerInfo get() throws InterruptedException, ExecutionException { + try { + if (info == null) { + while (!disposed.get()) { + if (slot.await(1, TimeUnit.SECONDS)) { + break; + } + } + } + return info; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Operation interupted: " + e, e); + } + throw new InterruptedException("Interrupted."); + } + } + + @Override + public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + try { + if (info == null) { + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + + while (!disposed.get() || System.currentTimeMillis() < deadline) { + if (slot.await(1, TimeUnit.MILLISECONDS)) { + break; + } + } + if (info == null) { + throw new TimeoutException(); + } + } + return info; + } catch (InterruptedException e) { + throw new InterruptedException("Interrupted."); + } + } + + public void set(BrokerInfo info) { + this.info = info; + this.slot.countDown(); + } + } + }