diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index a84bdb0174..b3e2675457 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -17,6 +17,7 @@ */ package org.apache.activemq.broker; +import java.net.URI; import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; @@ -239,6 +240,14 @@ public interface Broker extends Region, Service { * @param adminConnectionContext */ public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext); - + + /** + * @return the temp data store + */ public Store getTempDataStore(); + + /** + * @return the URI that can be used to connect to the local Broker + */ + public URI getVmConnectorURI(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 30078880f6..8879ccee4a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Map; import java.util.Set; @@ -238,5 +239,8 @@ public class BrokerFilter implements Broker { return next.getTempDataStore(); } + public URI getVmConnectorURI(){ + return next.getVmConnectorURI(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index ac67082652..2b5f9c2713 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -236,5 +237,9 @@ public class EmptyBroker implements Broker { public Store getTempDataStore() { return null; } + + public URI getVmConnectorURI(){ + return null; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 696f8ec608..e93d638db8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -17,6 +17,7 @@ */ package org.apache.activemq.broker; +import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -235,5 +236,9 @@ public class ErrorBroker implements Broker { public Store getTempDataStore() { throw new BrokerStoppedException(this.message); } + + public URI getVmConnectorURI(){ + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 04a5af3021..9e58900005 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -38,6 +38,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Map; import java.util.Set; @@ -249,5 +250,9 @@ public class MutableBrokerFilter implements Broker { public Store getTempDataStore() { return getNext().getTempDataStore(); } + + public URI getVmConnectorURI(){ + return getNext().getVmConnectorURI(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 01b6e1315a..37e5556a14 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -66,6 +67,8 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.network.NetworkBridgeConfiguration; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConsumerState; @@ -77,6 +80,8 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -129,6 +134,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit private ConnectionContext context; private boolean networkConnection; private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private DemandForwardingBridge duplexBridge = null; static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -464,6 +470,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit if(seq>producerState.getLastSequenceId()){ producerState.setLastSequenceId(seq); broker.send(producerExchange,messageSend); + }else { + if (log.isDebugEnabled()) { + log.debug("Discarding duplicate: " + messageSend); + } } }else{ // producer not local to this broker @@ -1063,6 +1073,19 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit masterBroker=new MasterBroker(parent,transport); masterBroker.startProcessing(); log.info("Slave Broker "+info.getBrokerName()+" is attached"); + }else if (info.isNetworkConnection() && info.isDuplexConnection()) { + //so this TransportConnection is the rear end of a network bridge + //We have been requested to create a two way pipe ... + try{ + Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties()); + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + IntrospectionSupport.setProperties(config,props,null); + config.setLocalBrokerName(broker.getBrokerName()); + + + }catch(IOException e){ + log.error("Creating duplex network bridge",e); + } } // We only expect to get one broker info command per connection if(this.brokerInfo!=null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 043cb386a1..72c273bfae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -608,4 +609,8 @@ public class RegionBroker implements Broker { public Store getTempDataStore() { return brokerService.getTempDataStore(); } + + public URI getVmConnectorURI(){ + return brokerService.getVmConnectorURI(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java index f66f03e884..3467eda7e9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -39,6 +39,7 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -231,4 +232,8 @@ public class StubBroker implements Broker { public Store getTempDataStore() { return null; } + + public URI getVmConnectorURI(){ + return null; + } }