git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1291465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-02-20 21:47:59 +00:00
parent e32ff5c7f4
commit 3221b16221
1 changed files with 63 additions and 28 deletions

View File

@ -16,29 +16,6 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import javax.jms.JMSException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.*;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.*;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
@ -52,10 +29,66 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.NetworkBridgeFilter;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A useful base class for implementing demand forwarding bridges. * A useful base class for implementing demand forwarding bridges.
*
*
*/ */
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
@ -339,11 +372,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ss.throwFirstException(); ss.throwFirstException();
} }
} }
if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
} }
} }
}
public void serviceRemoteException(Throwable error) { public void serviceRemoteException(Throwable error) {
if (!disposed.get()) { if (!disposed.get()) {