From 5a429d90bbfeb27d012127cc5d03f67b0ef7ed98 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 31 Jan 2006 16:35:13 +0000 Subject: [PATCH] Fixes for networks and the invalid Brokers caper git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@373863 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 6 +- .../activemq/broker/region/RegionBroker.java | 2 +- .../apache/activemq/command/BaseCommand.java | 4 + .../org/apache/activemq/command/Command.java | 1 + .../activemq/command/KeepAliveInfo.java | 4 + .../apache/activemq/command/ShutdownInfo.java | 4 + .../activemq/command/WireFormatInfo.java | 4 + .../network/DemandForwardingBridge.java | 560 +++++++++--------- .../activemq/network/NetworkConnector.java | 22 +- .../transport/vm/VMTransportFactory.java | 207 ++++--- .../network/DemandForwardingBridgeTest.java | 2 +- 11 files changed, 426 insertions(+), 390 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 4062683dec..52b80a6fe4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -47,6 +47,7 @@ import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.ServiceStopper; @@ -190,7 +191,7 @@ public class BrokerService implements Service { * @throws Exception */ public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception{ - NetworkConnector connector=new NetworkConnector(); + NetworkConnector connector=new NetworkConnector(this); // add the broker name to the parameters if not set connector.setUri(discoveryAddress); return addNetworkConnector(connector); @@ -219,7 +220,6 @@ public class BrokerService implements Service { map.put("network", "true"); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); connector.setLocalUri(uri); - connector.setBrokerName(getBrokerName()); networkConnectors.add(connector); if (isUseJmx()) { registerNetworkConnectorMBean(connector); @@ -356,6 +356,8 @@ public class BrokerService implements Service { } log.info("ActiveMQ Message Broker (" + getBrokerName() + ") is shutting down"); BrokerRegistry.getInstance().unbind(getBrokerName()); + //remove any VMTransports connected + VMTransportFactory.stopped(getBrokerName()); removeShutdownHook(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index d5946eaca2..d7401b3d27 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -147,7 +147,7 @@ public class RegionBroker implements Broker { } synchronized (clientIdSet ) { if (clientIdSet.containsKey(clientId)) { - throw new InvalidClientIDException("Client: " + clientId + " already connected"); + throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected"); } else { clientIdSet.put(clientId, info); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java index 2cfdae54a3..402c3d68a4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java @@ -64,6 +64,10 @@ abstract public class BaseCommand implements Command { public boolean isMessageDispatchNotification(){ return false; } + + public boolean isShutdownInfo(){ + return false; + } /** * @openwire:property version=1 diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Command.java b/activemq-core/src/main/java/org/apache/activemq/command/Command.java index e6320e08ec..6d08e79bed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Command.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Command.java @@ -43,6 +43,7 @@ public interface Command extends DataStructure { boolean isMessage(); boolean isMessageAck(); boolean isMessageDispatchNotification(); + boolean isShutdownInfo(); Response visit( CommandVisitor visitor) throws Throwable; } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java index 011d354c38..27bf6d50e6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java @@ -79,5 +79,9 @@ public class KeepAliveInfo implements Command { public boolean isMessageDispatchNotification(){ return false; } + + public boolean isShutdownInfo(){ + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java index 6ba26fe07b..e325f4cd04 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ShutdownInfo.java @@ -34,6 +34,10 @@ public class ShutdownInfo extends BaseCommand { public Response visit(CommandVisitor visitor) throws Throwable { return visitor.processShutdown( this ); } + + public boolean isShutdownInfo(){ + return true; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java index ad0fa14c9b..9b87c9870e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -172,5 +172,9 @@ public class WireFormatInfo implements Command { public boolean isMessageDispatchNotification(){ return false; } + + public boolean isShutdownInfo(){ + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java index 2927a42275..bce6fb0e72 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java @@ -1,26 +1,22 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.network; import java.io.IOException; - import javax.jms.JMSException; - import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -53,393 +49,415 @@ import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; - +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** - * Forwards messages from the local broker to the remote broker based on - * demand. + * Forwards messages from the local broker to the remote broker based on demand. * * @org.xbean.XBean * * @version $Revision$ */ -public class DemandForwardingBridge implements Bridge { - - static final private Log log = LogFactory.getLog(DemandForwardingBridge.class); - +public class DemandForwardingBridge implements Bridge{ + static final private Log log=LogFactory.getLog(DemandForwardingBridge.class); private final Transport localBroker; private final Transport remoteBroker; - - IdGenerator idGenerator = new IdGenerator(); - LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); - - ConnectionInfo connectionInfo; - SessionInfo sessionInfo; - ProducerInfo producerInfo; - - private String clientId; + private IdGenerator idGenerator=new IdGenerator(); + private LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator(); + private ConnectionInfo localConnectionInfo; + private ConnectionInfo remoteConnectionInfo; + private SessionInfo localSessionInfo; + private ProducerInfo producerInfo; + private String localBrokerName; + private String remoteBrokerName; + private String localClientId; private int prefetchSize=1000; private boolean dispatchAsync; - private String destinationFilter = ">"; - + private String destinationFilter=">"; private ConsumerInfo demandConsumerInfo; private int demandConsumerDispatched; - + private AtomicBoolean localBridgeStarted=new AtomicBoolean(false); + private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false); + private boolean disposed=false; BrokerId localBrokerId; BrokerId remoteBrokerId; - - private static class DemandSubscription { + private static class DemandSubscription{ ConsumerInfo remoteInfo; ConsumerInfo localInfo; int dispatched; - - public DemandSubscription(ConsumerInfo info) { - remoteInfo = info; - localInfo = info.copy(); + + public DemandSubscription(ConsumerInfo info){ + remoteInfo=info; + localInfo=info.copy(); } } - - ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); - ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); - - protected final BrokerId localBrokerPath[] = new BrokerId[] {null}; - protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; - - public DemandForwardingBridge(Transport localBroker, Transport remoteBroker) { - this.localBroker = localBroker; - this.remoteBroker = remoteBroker; + ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap(); + ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap(); + protected final BrokerId localBrokerPath[]=new BrokerId[] { null }; + protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null }; + + public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){ + this.localBroker=localBroker; + this.remoteBroker=remoteBroker; } - public void start() throws Exception { - log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); - + public void start() throws Exception{ + log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); localBroker.setTransportListener(new TransportListener(){ - public void onCommand(Command command) { + public void onCommand(Command command){ serviceLocalCommand(command); } - public void onException(IOException error) { + + public void onException(IOException error){ serviceLocalException(error); } }); - remoteBroker.setTransportListener(new TransportListener(){ - public void onCommand(Command command) { + public void onCommand(Command command){ serviceRemoteCommand(command); } - public void onException(IOException error) { + + public void onException(IOException error){ serviceRemoteException(error); } }); - localBroker.start(); remoteBroker.start(); - + triggerRemoteStartBridge(); } - protected void triggerStartBridge() throws IOException { - Thread thead = new Thread() { - public void run() { - try { - startBridge(); - } - catch (IOException e) { - log.error("Failed to start network bridge: " + e, e); + protected void triggerLocalStartBridge() throws IOException{ + Thread thead=new Thread(){ + public void run(){ + try{ + startLocalBridge(); + }catch(IOException e){ + log.error("Failed to start network bridge: "+e,e); } } }; thead.start(); } - - protected void startBridge() throws IOException { - BrokerInfo brokerInfo = new BrokerInfo(); - remoteBroker.oneway(brokerInfo); - connectionInfo = new ConnectionInfo(); - connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - connectionInfo.setClientId(clientId); - localBroker.oneway(connectionInfo); - remoteBroker.oneway(connectionInfo); - sessionInfo=new SessionInfo(connectionInfo, 1); - localBroker.oneway(sessionInfo); - remoteBroker.oneway(sessionInfo); - - producerInfo = new ProducerInfo(sessionInfo, 1); - producerInfo.setResponseRequired(false); - remoteBroker.oneway(producerInfo); - - // Listen to consumer advisory messages on the remote broker to determine demand. - demandConsumerInfo = new ConsumerInfo(sessionInfo, 1); - demandConsumerInfo.setDispatchAsync(dispatchAsync); - demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter)); - demandConsumerInfo.setPrefetchSize(prefetchSize); - remoteBroker.oneway(demandConsumerInfo); - - log.info("Network connection between " + localBroker + " and " + remoteBroker + " has been established."); - } - - public void stop() throws Exception{ - try { - if( connectionInfo!=null ) { - localBroker.request(connectionInfo.createRemoveCommand()); - remoteBroker.request(connectionInfo.createRemoveCommand()); + protected void triggerRemoteStartBridge() throws IOException{ + Thread thead=new Thread(){ + public void run(){ + try{ + startRemoteBridge(); + }catch(IOException e){ + log.error("Failed to start network bridge: "+e,e); + } } - localBroker.setTransportListener(null); - remoteBroker.setTransportListener(null); - remoteBroker.oneway(new ShutdownInfo()); - localBroker.oneway(new ShutdownInfo()); - }catch(IOException e){ - log.debug("Caught exception stopping",e); - } finally { - ServiceStopper ss = new ServiceStopper(); - ss.stop(localBroker); - ss.stop(remoteBroker); - ss.throwFirstException(); + }; + thead.start(); + } + + protected void startLocalBridge() throws IOException{ + if(localBridgeStarted.compareAndSet(false,true)){ + localConnectionInfo=new ConnectionInfo(); + localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); + localClientId="NC_"+remoteBrokerName+"_inbound"; + localConnectionInfo.setClientId(localClientId); + localBroker.oneway(localConnectionInfo); + localSessionInfo=new SessionInfo(localConnectionInfo,1); + localBroker.oneway(localSessionInfo); + log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName + +") has been established."); } } - - protected void serviceRemoteException(IOException error) { - log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error); + + protected void startRemoteBridge() throws IOException{ + if(remoteBridgeStarted.compareAndSet(false,true)){ + BrokerInfo brokerInfo=new BrokerInfo(); + brokerInfo.setBrokerName(localBrokerName); + remoteBroker.oneway(brokerInfo); + remoteConnectionInfo=new ConnectionInfo(); + remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); + remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"); + remoteBroker.oneway(remoteConnectionInfo); + SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1); + remoteBroker.oneway(remoteSessionInfo); + producerInfo=new ProducerInfo(remoteSessionInfo,1); + producerInfo.setResponseRequired(false); + remoteBroker.oneway(producerInfo); + // Listen to consumer advisory messages on the remote broker to determine demand. + demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1); + demandConsumerInfo.setDispatchAsync(dispatchAsync); + demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + +destinationFilter)); + demandConsumerInfo.setPrefetchSize(prefetchSize); + remoteBroker.oneway(demandConsumerInfo); + } + } + + public void stop() throws Exception{ + if(!disposed){ + try{ + disposed=true; + localBridgeStarted.set(false); + remoteBridgeStarted.set(false); + if(localConnectionInfo!=null){ + localBroker.request(localConnectionInfo.createRemoveCommand()); + remoteBroker.request(remoteConnectionInfo.createRemoveCommand()); + } + localBroker.setTransportListener(null); + remoteBroker.setTransportListener(null); + remoteBroker.oneway(new ShutdownInfo()); + localBroker.oneway(new ShutdownInfo()); + }catch(IOException e){ + log.debug("Caught exception stopping",e); + }finally{ + ServiceStopper ss=new ServiceStopper(); + ss.stop(localBroker); + ss.stop(remoteBroker); + ss.throwFirstException(); + } + } + } + + protected void serviceRemoteException(IOException error){ + log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); ServiceSupport.dispose(this); } - - protected void serviceRemoteCommand(Command command) { - try { - if( command.isMessageDispatch() ) { - MessageDispatch md = (MessageDispatch) command; - serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); - demandConsumerDispatched++; - if( demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize()*.75) ) { - remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched)); - demandConsumerDispatched=0; - } - } else if ( command.isBrokerInfo() ) { - synchronized( this ) { - remoteBrokerId = ((BrokerInfo)command).getBrokerId(); - remoteBrokerPath[0] = remoteBrokerId; - if( localBrokerId !=null) { - if( localBrokerId.equals(remoteBrokerId) ) { - log.info("Disconnecting loop back connection."); - ServiceSupport.dispose(this); - } else { - triggerStartBridge(); + + protected void serviceRemoteCommand(Command command){ + if(!disposed){ + try{ + if(command.isMessageDispatch()){ + MessageDispatch md=(MessageDispatch) command; + serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); + demandConsumerDispatched++; + if(demandConsumerDispatched>(demandConsumerInfo.getPrefetchSize()*.75)){ + remoteBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,demandConsumerDispatched)); + demandConsumerDispatched=0; + } + }else if(command.isBrokerInfo()){ + synchronized(this){ + BrokerInfo remoteBrokerInfo=(BrokerInfo) command; + remoteBrokerId=remoteBrokerInfo.getBrokerId(); + remoteBrokerPath[0]=remoteBrokerId; + remoteBrokerName=remoteBrokerInfo.getBrokerName(); + if(localBrokerId!=null){ + if(localBrokerId.equals(remoteBrokerId)){ + log.info("Disconnecting loop back connection."); + ServiceSupport.dispose(this); + }else{ + triggerLocalStartBridge(); + } } } - } - } else { - switch ( command.getDataStructureType() ) { + }else{ + switch(command.getDataStructureType()){ case WireFormatInfo.DATA_STRUCTURE_TYPE: - break; + break; default: log.warn("Unexpected remote command: "+command); + } } + }catch(IOException e){ + serviceRemoteException(e); } - } catch (IOException e) { - serviceRemoteException(e); } } - private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { - if( data.getClass() == ConsumerInfo.class ) { - + private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{ + if(data.getClass()==ConsumerInfo.class){ // Create a new local subscription - ConsumerInfo info = (ConsumerInfo) data; - BrokerId[] path = info.getBrokerPath(); - - if( (path!=null && path.length>0) || info.isNetworkSubscription() ) { - // Ignore: We only support directly connected brokers for now. + ConsumerInfo info=(ConsumerInfo) data; + BrokerId[] path=info.getBrokerPath(); + if((path!=null&&path.length>0)||info.isNetworkSubscription()){ + // Ignore: We only support directly connected brokers for now. return; } - if( contains(info.getBrokerPath(), localBrokerPath[0]) ) { + if(contains(info.getBrokerPath(),localBrokerPath[0])){ // Ignore this consumer as it's a consumer we locally sent to the broker. return; } - - if( log.isTraceEnabled() ) - log.trace("Forwarding sub on " + localBroker + " from " + remoteBroker + " on "+info); - - + if(log.isTraceEnabled()) + log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info); // Update the packet to show where it came from. - info = info.copy(); - info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(), remoteBrokerPath) ); - - DemandSubscription sub = new DemandSubscription(info); - sub.localInfo.setConsumerId( new ConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()) ); + info=info.copy(); + info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath)); + DemandSubscription sub=new DemandSubscription(info); + sub.localInfo.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator + .getNextSequenceId())); sub.localInfo.setDispatchAsync(dispatchAsync); sub.localInfo.setPrefetchSize(prefetchSize); - byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; - if( priority > Byte.MIN_VALUE && info.getBrokerPath()!=null && info.getBrokerPath().length>1 ) { + byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY; + if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){ // The longer the path to the consumer, the less it's consumer priority. - priority -= info.getBrokerPath().length+1; + priority-=info.getBrokerPath().length+1; } sub.localInfo.setPriority(priority); - subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(), sub); - subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub); + subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub); + subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub); sub.localInfo.setBrokerPath(info.getBrokerPath()); sub.localInfo.setNetworkSubscription(true); // This works for now since we use a VM connection to the local broker. // may need to change if we ever subscribe to a remote broker. sub.localInfo.setAdditionalPredicate(new BooleanExpression(){ - public boolean matches(MessageEvaluationContext message) throws JMSException { - try { + public boolean matches(MessageEvaluationContext message) throws JMSException{ + try{ return matchesForwardingFilter(message.getMessage()); - } catch (IOException e) { + }catch(IOException e){ throw JMSExceptionSupport.create(e); } } - public Object evaluate(MessageEvaluationContext message) throws JMSException { - return matches(message) ? Boolean.TRUE : Boolean.FALSE; + + public Object evaluate(MessageEvaluationContext message) throws JMSException{ + return matches(message)?Boolean.TRUE:Boolean.FALSE; } }); - - localBroker.oneway(sub.localInfo); + localBroker.oneway(sub.localInfo); } - if( data.getClass() == RemoveInfo.class ) { - ConsumerId id = (ConsumerId) ((RemoveInfo)data).getObjectId(); - DemandSubscription sub = (DemandSubscription)subscriptionMapByRemoteId.remove(id); - if( sub !=null ) { + if(data.getClass()==RemoveInfo.class){ + ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId(); + DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id); + if(sub!=null){ subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId()); localBroker.oneway(sub.localInfo.createRemoveCommand()); } } } - protected void serviceLocalException(Throwable error) { - log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error); + protected void serviceLocalException(Throwable error){ + log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); ServiceSupport.dispose(this); } - boolean matchesForwardingFilter(Message message) { - if( message.isRecievedByDFBridge() || contains(message.getBrokerPath(), remoteBrokerPath[0]) ) + boolean matchesForwardingFilter(Message message){ + if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0])) return false; - // Don't propagate advisory messages about network subscriptions - if( message.isAdvisory() - && message.getDataStructure()!=null - && message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) { - + if(message.isAdvisory()&&message.getDataStructure()!=null + &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){ ConsumerInfo info=(ConsumerInfo) message.getDataStructure(); - if(info.isNetworkSubscription()) { + if(info.isNetworkSubscription()){ return false; } } return true; } - - protected void serviceLocalCommand(Command command) { - final boolean trace = log.isTraceEnabled(); - try { - if( command.isMessageDispatch() ) { - MessageDispatch md = (MessageDispatch) command; - Message message = md.getMessage(); - DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId()); - if( sub!=null ) { - - message = message.copy(); - - // Update the packet to show where it came from. - message.setBrokerPath( appendToBrokerPath(message.getBrokerPath(), localBrokerPath) ); - message.setProducerId(producerInfo.getProducerId()); - message.setDestination( md.getDestination() ); - - if( message.getOriginalTransactionId()==null ) - message.setOriginalTransactionId(message.getTransactionId()); - message.setTransactionId(null); - message.setRecievedByDFBridge(true); - message.evictMarshlledForm(); - - if( trace ) - log.trace("bridging " + localBroker + " -> " + remoteBroker + ": "+message); - if (!message.isPersistent() || !sub.remoteInfo.isDurable()){ - remoteBroker.oneway( message ); - }else{ - Response response = remoteBroker.request(message); - if (response.isException()) { - ExceptionResponse er = (ExceptionResponse) response; - serviceLocalException(er.getException()); - + protected void serviceLocalCommand(Command command){ + if(!disposed){ + final boolean trace=log.isTraceEnabled(); + try{ + if(command.isMessageDispatch()){ + MessageDispatch md=(MessageDispatch) command; + Message message=md.getMessage(); + DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); + if(sub!=null){ + message=message.copy(); + // Update the packet to show where it came from. + message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath)); + message.setProducerId(producerInfo.getProducerId()); + message.setDestination(md.getDestination()); + if(message.getOriginalTransactionId()==null) + message.setOriginalTransactionId(message.getTransactionId()); + message.setTransactionId(null); + message.setRecievedByDFBridge(true); + message.evictMarshlledForm(); + if(trace) + log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message); + if(!message.isPersistent()||!sub.remoteInfo.isDurable()){ + remoteBroker.oneway(message); + }else{ + Response response=remoteBroker.request(message); + if(response.isException()){ + ExceptionResponse er=(ExceptionResponse) response; + serviceLocalException(er.getException()); + } + } + sub.dispatched++; + if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){ + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched)); + sub.dispatched=0; } } - sub.dispatched++; - if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75) ) { - localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, sub.dispatched)); - sub.dispatched=0; - } - - } - } else if ( command.isBrokerInfo() ) { - synchronized( this ) { - localBrokerId = ((BrokerInfo)command).getBrokerId(); - localBrokerPath[0] = localBrokerId; - if( remoteBrokerId !=null ) { - if( remoteBrokerId.equals(localBrokerId) ) { - log.info("Disconnecting loop back connection."); - ServiceSupport.dispose(this); - } else { - triggerStartBridge(); + }else if(command.isBrokerInfo()){ + synchronized(this){ + localBrokerId=((BrokerInfo) command).getBrokerId(); + localBrokerPath[0]=localBrokerId; + if(remoteBrokerId!=null){ + if(remoteBrokerId.equals(localBrokerId)){ + log.info("Disconnecting loop back connection."); + ServiceSupport.dispose(this); + } } } + }else if(command.isShutdownInfo()){ + log.info(localBrokerName+" Shutting down"); + disposed = true; + stop(); + }else{ + switch(command.getDataStructureType()){ + case WireFormatInfo.DATA_STRUCTURE_TYPE: + break; + default: + log.warn("Unexpected local command: "+command); + } } - } else { - switch ( command.getDataStructureType() ) { - case WireFormatInfo.DATA_STRUCTURE_TYPE: - break; - default: - log.warn("Unexpected local command: "+command); - } + }catch(Exception e){ + serviceLocalException(e); } - } catch (Exception e) { - serviceLocalException(e); } } - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public int getPrefetchSize() { + public int getPrefetchSize(){ return prefetchSize; } - public void setPrefetchSize(int prefetchSize) { - this.prefetchSize = prefetchSize; + public void setPrefetchSize(int prefetchSize){ + this.prefetchSize=prefetchSize; } - public boolean isDispatchAsync() { + public boolean isDispatchAsync(){ return dispatchAsync; } - public void setDispatchAsync(boolean dispatchAsync) { - this.dispatchAsync = dispatchAsync; + public void setDispatchAsync(boolean dispatchAsync){ + this.dispatchAsync=dispatchAsync; } - public String getDestinationFilter() { + public String getDestinationFilter(){ return destinationFilter; } - public void setDestinationFilter(String destinationFilter) { - this.destinationFilter = destinationFilter; + + public void setDestinationFilter(String destinationFilter){ + this.destinationFilter=destinationFilter; } - - private boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { - if( brokerPath!=null ) { - for (int i = 0; i < brokerPath.length; i++) { - if( brokerId.equals(brokerPath[i]) ) + + /** + * @return Returns the localBrokerName. + */ + public String getLocalBrokerName(){ + return localBrokerName; + } + + /** + * @param localBrokerName + * The localBrokerName to set. + */ + public void setLocalBrokerName(String localBrokerName){ + this.localBrokerName=localBrokerName; + } + + private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){ + if(brokerPath!=null){ + for(int i=0;i