diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 48b20b9feb..05ad7c851f 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -360,6 +360,10 @@ **/MultipleTestsWithSpringFactoryBeanTest.* **/MultipleTestsWithXBeanFactoryBeanTest.* **/MultipleTestsWithSpringXBeanFactoryBeanTest.* + + + **/MulticastNetworkTest.* + **/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index eb87321c48..5856ac2cbe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -47,6 +47,7 @@ import org.apache.activemq.broker.jmx.ProxyConnectorView; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.proxy.ProxyConnector; @@ -196,9 +197,7 @@ public class BrokerService implements Service { * @throws Exception */ public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception{ - NetworkConnector connector=new NetworkConnector(); - // add the broker name to the parameters if not set - connector.setUri(discoveryAddress); + NetworkConnector connector=new DiscoveryNetworkConnector(discoveryAddress); return addNetworkConnector(connector); } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java index b577c17083..32c4ce7d1b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java @@ -24,6 +24,7 @@ package org.apache.activemq.command; public class BaseEndpoint implements Endpoint { private String name; + BrokerInfo brokerInfo; public BaseEndpoint(String name) { this.name = name; @@ -34,8 +35,35 @@ public class BaseEndpoint implements Endpoint { } public String toString() { - return "Endpoint[" + name + "]"; + String brokerText = ""; + BrokerId brokerId = getBrokerId(); + if (brokerId != null) { + brokerText = " broker: " + brokerId; + } + return "Endpoint[name:" + name + brokerText + "]"; + } + + /** + * Returns the broker ID for this endpoint, if the endpoint is a broker or + * null + */ + public BrokerId getBrokerId() { + if (brokerInfo != null) { + return brokerInfo.getBrokerId(); + } + return null; + } + + /** + * Returns the broker information for this endpoint, if the endpoint is a + * broker or null + */ + public BrokerInfo getBrokerInfo() { + return brokerInfo; + } + + public void setBrokerInfo(BrokerInfo brokerInfo) { + this.brokerInfo = brokerInfo; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java b/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java index af98eeea90..508caf3690 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Endpoint.java @@ -32,4 +32,18 @@ public interface Endpoint { */ public String getName(); + /** + * Returns the broker ID for this endpoint, if the endpoint is a broker or + * null + */ + public BrokerId getBrokerId(); + + /** + * Returns the broker information for this endpoint, if the endpoint is a + * broker or null + */ + public BrokerInfo getBrokerInfo(); + + public void setBrokerInfo(BrokerInfo brokerInfo); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java new file mode 100644 index 0000000000..f331381c83 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java @@ -0,0 +1,103 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.activemq.network; + +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.command.NetworkBridgeFilter; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.ServiceSupport; + +import java.io.IOException; + +/** + * A demand forwarding bridge which works with multicast style transports where + * a single Transport could be communicating with multiple remote brokers + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { + + protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; + protected Object brokerInfoMutex = new Object(); + + public CompositeDemandForwardingBridge(Transport localBroker, Transport remoteBroker) { + super(localBroker, remoteBroker); + remoteBrokerName = remoteBroker.toString(); + } + + protected void serviceRemoteBrokerInfo(Command command) throws IOException { + synchronized (brokerInfoMutex) { + BrokerInfo remoteBrokerInfo = (BrokerInfo) command; + BrokerId remoteBrokerId = remoteBrokerInfo.getBrokerId(); + + // lets associate the incoming endpoint with a broker ID so we can refer to it later + Endpoint from = command.getFrom(); + if (from == null) { + log.warn("Incoming command does not have a from endpoint: " + command); + } + else { + from.setBrokerInfo(remoteBrokerInfo); + } + if (localBrokerId != null) { + if (localBrokerId.equals(remoteBrokerId)) { + log.info("Disconnecting loop back connection."); + // waitStarted(); + ServiceSupport.dispose(this); + } + } + if (!disposed) { + triggerLocalStartBridge(); + } + } + } + + protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { + info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getFromBrokerId(info))); + } + + /** + * Returns the broker ID that the command came from + */ + protected BrokerId getFromBrokerId(Command command) throws IOException { + BrokerId answer = null; + Endpoint from = command.getFrom(); + if (from == null) { + log.warn("Incoming command does not have a from endpoint: " + command); + } + else { + answer = from.getBrokerId(); + } + if (answer != null) { + return answer; + } + else { + throw new IOException("No broker ID is available for endpoint: " + from + " from command: " + command); + } + } + + protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { + // TODO is there much we can do here? + } + + protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { + return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java index 24a8910b48..93b319f482 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.filter.DestinationFilter; @@ -27,6 +23,11 @@ import org.apache.activemq.transport.Transport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + /** * Consolidates subscriptions @@ -44,7 +45,7 @@ public class ConduitBridge extends DemandForwardingBridge{ super(localBroker,remoteBroker); } - protected DemandSubscription createDemandSubscription(ConsumerInfo info){ + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{ if (addToAlreadyInterestedConsumers(info)){ return null; //don't want this subscription added diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java index 00748999da..359b0aa68d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java @@ -13,8 +13,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; - import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; @@ -23,6 +21,8 @@ import org.apache.activemq.command.NetworkBridgeFilter; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.ServiceSupport; +import java.io.IOException; + /** * Forwards messages from the local broker to the remote broker based on demand. * @@ -76,9 +76,8 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport { } } } - - protected NetworkBridgeFilter createNetworkBridgeFilter() { + + protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 33f8c3638c..cda91e0654 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -61,14 +61,6 @@ import java.io.IOException; * @version $Revision$ */ public abstract class DemandForwardingBridgeSupport implements Bridge { - protected abstract NetworkBridgeFilter createNetworkBridgeFilter(); - - protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; - - protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info); - - protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; - protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class); protected final Transport localBroker; protected final Transport remoteBroker; @@ -654,11 +646,11 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { } } - protected DemandSubscription createDemandSubscription(ConsumerInfo info) { - return doCreateDemandSubscription(info); + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { + return doCreateDemandSubscription(info); } - protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) { + protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { DemandSubscription result=new DemandSubscription(info); result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator .getNextSequenceId())); @@ -671,7 +663,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { } result.getLocalInfo().setPriority(priority); } - configureDemandSubscription(result); + configureDemandSubscription(info, result); return result; } @@ -688,7 +680,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { return result; } - protected void configureDemandSubscription(DemandSubscription sub) { + protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { sub.getLocalInfo().setDispatchAsync(dispatchAsync); sub.getLocalInfo().setPrefetchSize(prefetchSize); subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub); @@ -696,7 +688,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { // This works for now since we use a VM connection to the local broker. // may need to change if we ever subscribe to a remote broker. - sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter()); + sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); } protected void removeDemandSubscription(ConsumerId id) throws IOException { @@ -715,6 +707,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { protected void clearDownSubscriptions() { } - + + protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException; + + protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException; + + protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; + + protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index bda261f170..129ce00e93 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.network; -import java.util.Set; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; /** * Represents a network bridge interface diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java new file mode 100644 index 0000000000..6a3fac2ea4 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -0,0 +1,243 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; +import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; + +/** + * A network connector which uses a discovery agent to detect the remote brokers + * available and setup a connection to each available remote broker + * + * @org.apache.xbean.XBean element="networkConnector" + * + * @version $Revision$ + */ +public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { + + private DiscoveryAgent discoveryAgent; + private ConcurrentHashMap bridges = new ConcurrentHashMap(); + + public DiscoveryNetworkConnector() { + } + + public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { + setUri(discoveryURI); + } + + public void setUri(URI discoveryURI) throws IOException { + setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); + } + + public void onServiceAdd(DiscoveryEvent event) { + String url = event.getServiceName(); + if (url != null) { + + URI uri; + try { + uri = new URI(url); + } + catch (URISyntaxException e) { + log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); + return; + } + + // Has it allready been added? + if (bridges.containsKey(uri) || localURI.equals(uri)) + return; + + URI connectUri = uri; + if (failover) { + try { + connectUri = new URI("failover:" + connectUri); + } + catch (URISyntaxException e) { + log.warn("Could not create failover URI: " + connectUri); + return; + } + } + + log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + connectUri); + + Transport localTransport; + try { + localTransport = createLocalTransport(); + } + catch (Exception e) { + log.warn("Could not connect to local URI: " + localURI + ": " + e, e); + return; + } + + Transport remoteTransport; + try { + remoteTransport = TransportFactory.connect(connectUri); + } + catch (Exception e) { + ServiceSupport.dispose(localTransport); + log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e); + return; + } + + Bridge bridge = createBridge(localTransport, remoteTransport, event); + bridges.put(uri, bridge); + try { + bridge.start(); + } + catch (Exception e) { + ServiceSupport.dispose(localTransport); + ServiceSupport.dispose(remoteTransport); + log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e, e); + return; + } + } + } + + public void onServiceRemove(DiscoveryEvent event) { + String url = event.getServiceName(); + if (url != null) { + URI uri; + try { + uri = new URI(url); + } + catch (URISyntaxException e) { + log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); + return; + } + + Bridge bridge = (Bridge) bridges.get(uri); + if (bridge == null) + return; + + ServiceSupport.dispose(bridge); + } + } + + public DiscoveryAgent getDiscoveryAgent() { + return discoveryAgent; + } + + public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { + this.discoveryAgent = discoveryAgent; + if (discoveryAgent != null) { + this.discoveryAgent.setDiscoveryListener(this); + this.discoveryAgent.setBrokerName(getBrokerName()); + } + } + + public boolean isFailover() { + return failover; + } + + public void setFailover(boolean reliable) { + this.failover = reliable; + } + + protected void doStart() throws Exception { + if (discoveryAgent == null) { + throw new IllegalStateException("You must configure the 'discoveryAgent' property"); + } + this.discoveryAgent.start(); + super.doStart(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + for (Iterator i = bridges.values().iterator(); i.hasNext();) { + Bridge bridge = (Bridge) i.next(); + try { + bridge.stop(); + } + catch (Exception e) { + stopper.onException(this, e); + } + } + try { + this.discoveryAgent.stop(); + } + catch (Exception e) { + stopper.onException(this, e); + } + + super.doStop(stopper); + } + + protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { + DemandForwardingBridge result = null; + if (conduitSubscriptions) { + if (dynamicOnly) { + result = new ConduitBridge(localTransport, remoteTransport) { + protected void serviceRemoteException(Exception error) { + super.serviceRemoteException(error); + try { + // Notify the discovery agent that the remote broker + // failed. + discoveryAgent.serviceFailed(event); + } + catch (IOException e) { + } + } + }; + } + else { + result = new DurableConduitBridge(localTransport, remoteTransport) { + protected void serviceRemoteException(Exception error) { + super.serviceRemoteException(error); + try { + // Notify the discovery agent that the remote broker + // failed. + discoveryAgent.serviceFailed(event); + } + catch (IOException e) { + } + } + }; + } + } + else { + result = new DemandForwardingBridge(localTransport, remoteTransport) { + protected void serviceRemoteException(Exception error) { + super.serviceRemoteException(error); + try { + // Notify the discovery agent that the remote broker + // failed. + discoveryAgent.serviceFailed(event); + } + catch (IOException e) { + } + } + }; + } + return configureBridge(result); + } + + protected String createName() { + return discoveryAgent.toString(); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index e7340c2678..2c984b64e8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -13,8 +13,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; -import java.util.Iterator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -22,6 +20,9 @@ import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.Transport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.Iterator; /** * Consolidates subscriptions * @@ -41,7 +42,7 @@ public class DurableConduitBridge extends ConduitBridge{ } /** - * Subscriptions for these desitnations are always created + * Subscriptions for these destinations are always created * */ protected void setupStaticDestinations(){ @@ -67,7 +68,7 @@ public class DurableConduitBridge extends ConduitBridge{ } } - protected DemandSubscription createDemandSubscription(ConsumerInfo info){ + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{ if(addToAlreadyInterestedConsumers(info)){ return null; // don't want this subscription added } @@ -79,7 +80,7 @@ public class DurableConduitBridge extends ConduitBridge{ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId())); } if(info.isDurable()){ - // set the subscriber name to something reproducable + // set the subscriber name to something reproducible info.setSubcriptionName(getLocalBrokerName()); } return doCreateDemandSubscription(info); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java index e2f4871a52..fca1a6a003 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; - import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; @@ -40,6 +38,8 @@ import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; + /** * Forwards all messages from the local broker to the remote broker. * diff --git a/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java new file mode 100644 index 0000000000..49b50ae810 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java @@ -0,0 +1,155 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.util.ServiceStopper; + +import java.net.URI; + +/** + * A network connector which uses some kind of multicast-like transport that + * communicates with potentially many remote brokers over a single logical + * {@link Transport} instance such as when using multicast. + * + * This implementation does not depend on multicast at all; any other group + * based transport could be used. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class MulticastNetworkConnector extends NetworkConnector { + + private Transport localTransport; + private Transport remoteTransport; + private URI remoteURI; + private DemandForwardingBridgeSupport bridge; + + public MulticastNetworkConnector() { + } + + public MulticastNetworkConnector(URI remoteURI) { + this.remoteURI = remoteURI; + } + + // Properties + // ------------------------------------------------------------------------- + + public DemandForwardingBridgeSupport getBridge() { + return bridge; + } + + public void setBridge(DemandForwardingBridgeSupport bridge) { + this.bridge = bridge; + } + + public Transport getLocalTransport() { + return localTransport; + } + + public void setLocalTransport(Transport localTransport) { + this.localTransport = localTransport; + } + + public Transport getRemoteTransport() { + return remoteTransport; + } + + /** + * Sets the remote transport implementation + */ + public void setRemoteTransport(Transport remoteTransport) { + this.remoteTransport = remoteTransport; + } + + public URI getRemoteURI() { + return remoteURI; + } + + /** + * Sets the remote transport URI to some group transport like + * multicast://address:port + */ + public void setRemoteURI(URI remoteURI) { + this.remoteURI = remoteURI; + } + + // Implementation methods + // ------------------------------------------------------------------------- + + protected void doStart() throws Exception { + if (remoteTransport == null) { + if (remoteURI == null) { + throw new IllegalArgumentException("You must specify the remoteURI property"); + } + remoteTransport = TransportFactory.connect(remoteURI); + } + + if (localTransport == null) { + localTransport = createLocalTransport(); + } + + bridge = createBridge(localTransport, remoteTransport); + configureBridge(bridge); + bridge.start(); + + // we need to start the transports after we've created the bridge + remoteTransport.start(); + localTransport.start(); + + super.doStart(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + super.doStop(stopper); + if (bridge != null) { + try { + bridge.stop(); + } + catch (Exception e) { + stopper.onException(this, e); + } + } + if (remoteTransport != null) { + try { + remoteTransport.stop(); + } + catch (Exception e) { + stopper.onException(this, e); + } + } + if (localTransport != null) { + try { + localTransport.stop(); + } + catch (Exception e) { + stopper.onException(this, e); + } + } + } + + protected String createName() { + return remoteTransport.toString(); + } + + protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { + return new CompositeDemandForwardingBridge(local, remote); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index 9e83a264f7..8791c7a25d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -16,176 +16,46 @@ */ package org.apache.activemq.network; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; -import org.apache.activemq.Service; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.discovery.DiscoveryAgent; -import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; -import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Set; /** - * @org.apache.xbean.XBean - * * @version $Revision$ */ -public class NetworkConnector implements Service, DiscoveryListener { +public abstract class NetworkConnector extends ServiceSupport { - private static final Log log = LogFactory.getLog(NetworkConnector.class); + protected static final Log log = LogFactory.getLog(NetworkConnector.class); + protected URI localURI; private String brokerName = "localhost"; - private DiscoveryAgent discoveryAgent; - private URI localURI; - private ConcurrentHashMap bridges = new ConcurrentHashMap(); private Set durableDestinations; - private boolean failover=true; + protected boolean failover = true; private List excludedDestinations = new CopyOnWriteArrayList(); private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList(); private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); - private boolean dynamicOnly = false; - private boolean conduitSubscriptions = true; + protected boolean dynamicOnly = false; + protected boolean conduitSubscriptions = true; private boolean decreaseNetworkConsumerPriority; private int networkTTL = 1; private String name = "bridge"; - - - public NetworkConnector(){ - - } - - public NetworkConnector(URI localURI, DiscoveryAgent discoveryAgent) throws IOException { + public NetworkConnector() { + } + + public NetworkConnector(URI localURI) { this.localURI = localURI; - setDiscoveryAgent(discoveryAgent); - } - - public void start() throws Exception { - if (discoveryAgent == null) { - throw new IllegalStateException("You must configure the 'discoveryAgent' property"); - } - if (localURI == null) { - throw new IllegalStateException("You must configure the 'localURI' property"); - } - this.discoveryAgent.start(); - log.info("Network Connector "+getName()+" Started"); - } - - public void stop() throws Exception { - this.discoveryAgent.stop(); - for (Iterator i = bridges.values().iterator();i.hasNext();){ - Bridge bridge = (Bridge)i.next(); - bridge.stop(); - } - log.info("Network Connector "+getName()+" Stopped"); - } - - public void onServiceAdd(DiscoveryEvent event) { - String url = event.getServiceName(); - if (url != null) { - - URI uri; - try { - uri = new URI(url); - } - catch (URISyntaxException e) { - log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); - return; - } - - // Has it allready been added? - if (bridges.containsKey(uri) || localURI.equals(uri)) - return; - - URI connectUri = uri; - if( failover ) { - try { - connectUri = new URI("failover:"+connectUri); - } catch (URISyntaxException e) { - log.warn("Could not create failover URI: "+connectUri); - return; - } - } - - log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + connectUri); - - Transport localTransport; - try { - localTransport = TransportFactory.connect(localURI); - } - catch (Exception e) { - log.warn("Could not connect to local URI: " + localURI + ": " + e, e); - return; - } - - Transport remoteTransport; - try { - remoteTransport = TransportFactory.connect(connectUri); - } - catch (Exception e) { - ServiceSupport.dispose(localTransport); - log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e); - return; - } - - Bridge bridge = createBridge(localTransport, remoteTransport, event); - bridges.put(uri, bridge); - try { - bridge.start(); - } - catch (Exception e) { - ServiceSupport.dispose(localTransport); - ServiceSupport.dispose(remoteTransport); - log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e, e); - return; - } - } - } - - public void onServiceRemove(DiscoveryEvent event) { - String url = event.getServiceName(); - if (url != null) { - URI uri; - try { - uri = new URI(url); - } catch (URISyntaxException e) { - log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); - return; - } - - Bridge bridge = (Bridge) bridges.get(uri); - if (bridge == null) - return; - - ServiceSupport.dispose(bridge); - } - } - - // Properties - // ------------------------------------------------------------------------- - public DiscoveryAgent getDiscoveryAgent() { - return discoveryAgent; - } - - public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { - this.discoveryAgent = discoveryAgent; - if (discoveryAgent != null) { - this.discoveryAgent.setDiscoveryListener(this); - this.discoveryAgent.setBrokerName(brokerName); - } } public URI getLocalUri() throws URISyntaxException { @@ -195,245 +65,211 @@ public class NetworkConnector implements Service, DiscoveryListener { public void setLocalUri(URI localURI) { this.localURI = localURI; } - - public void setUri(URI discoveryURI) throws IOException { - setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); - } - - - public boolean isFailover() { - return failover; - } - - public void setFailover(boolean reliable) { - this.failover = reliable; - } - - - /** - * @return Returns the brokerName. - */ - public String getBrokerName(){ - return brokerName; - } - - - /** - * @param brokerName The brokerName to set. - */ - public void setBrokerName(String brokerName){ - this.brokerName=brokerName; - } - /** * @return Returns the name. */ - public String getName(){ - if( name == null ) { - name = discoveryAgent.toString(); + public String getName() { + if (name == null) { + name = createName(); } return name; } /** - * @param name The name to set. + * @param name + * The name to set. */ - public void setName(String name){ - this.name=name; + public void setName(String name) { + this.name = name; } + public String getBrokerName() { + return brokerName; + } + + /** + * @param brokerName + * The brokerName to set. + */ + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } /** * @return Returns the durableDestinations. */ - public Set getDurableDestinations(){ + public Set getDurableDestinations() { return durableDestinations; } - /** - * @param durableDestinations The durableDestinations to set. + * @param durableDestinations + * The durableDestinations to set. */ - public void setDurableDestinations(Set durableDestinations){ - this.durableDestinations=durableDestinations; + public void setDurableDestinations(Set durableDestinations) { + this.durableDestinations = durableDestinations; } - - /** * @return Returns the dynamicOnly. */ - public boolean isDynamicOnly(){ + public boolean isDynamicOnly() { return dynamicOnly; } - /** - * @param dynamicOnly The dynamicOnly to set. + * @param dynamicOnly + * The dynamicOnly to set. */ - public void setDynamicOnly(boolean dynamicOnly){ - this.dynamicOnly=dynamicOnly; + public void setDynamicOnly(boolean dynamicOnly) { + this.dynamicOnly = dynamicOnly; } - + /** * @return Returns the conduitSubscriptions. */ - public boolean isConduitSubscriptions(){ + public boolean isConduitSubscriptions() { return conduitSubscriptions; } - /** - * @param conduitSubscriptions The conduitSubscriptions to set. + * @param conduitSubscriptions + * The conduitSubscriptions to set. */ - public void setConduitSubscriptions(boolean conduitSubscriptions){ - this.conduitSubscriptions=conduitSubscriptions; + public void setConduitSubscriptions(boolean conduitSubscriptions) { + this.conduitSubscriptions = conduitSubscriptions; } - + /** * @return Returns the decreaseNetworkConsumerPriority. */ - public boolean isDecreaseNetworkConsumerPriority(){ + public boolean isDecreaseNetworkConsumerPriority() { return decreaseNetworkConsumerPriority; } /** - * @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set. + * @param decreaseNetworkConsumerPriority + * The decreaseNetworkConsumerPriority to set. */ - public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){ - this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority; + public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) { + this.decreaseNetworkConsumerPriority = decreaseNetworkConsumerPriority; } - + /** * @return Returns the networkTTL. */ - public int getNetworkTTL(){ + public int getNetworkTTL() { return networkTTL; } /** - * @param networkTTL The networkTTL to set. + * @param networkTTL + * The networkTTL to set. */ - public void setNetworkTTL(int networkTTL){ - this.networkTTL=networkTTL; + public void setNetworkTTL(int networkTTL) { + this.networkTTL = networkTTL; } - /** * @return Returns the excludedDestinations. */ - public List getExcludedDestinations(){ + public List getExcludedDestinations() { return excludedDestinations; } + /** - * @param excludedDestinations The excludedDestinations to set. + * @param excludedDestinations + * The excludedDestinations to set. */ - public void setExcludedDestinations(List exludedDestinations){ - this.excludedDestinations=exludedDestinations; - } + public void setExcludedDestinations(List exludedDestinations) { + this.excludedDestinations = exludedDestinations; + } + public void addExcludedDestination(ActiveMQDestination destiantion) { this.excludedDestinations.add(destiantion); } - /** * @return Returns the staticallyIncludedDestinations. */ - public List getStaticallyIncludedDestinations(){ + public List getStaticallyIncludedDestinations() { return staticallyIncludedDestinations; } + /** - * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. + * @param staticallyIncludedDestinations + * The staticallyIncludedDestinations to set. */ - public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){ - this.staticallyIncludedDestinations=staticallyIncludedDestinations; + public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) { + this.staticallyIncludedDestinations = staticallyIncludedDestinations; } + public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { this.staticallyIncludedDestinations.add(destiantion); } - - + /** * @return Returns the dynamicallyIncludedDestinations. */ - public List getDynamicallyIncludedDestinations(){ + public List getDynamicallyIncludedDestinations() { return dynamicallyIncludedDestinations; } + /** - * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set. + * @param dynamicallyIncludedDestinations + * The dynamicallyIncludedDestinations to set. */ - public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){ + public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) { this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; } + public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { this.dynamicallyIncludedDestinations.add(destiantion); } - // Implementation methods // ------------------------------------------------------------------------- - protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { - DemandForwardingBridge result = null; - if (conduitSubscriptions){ - if (dynamicOnly){ - result = new ConduitBridge(localTransport, remoteTransport) { - protected void serviceRemoteException(Exception error) { - super.serviceRemoteException(error); - try { - // Notify the discovery agent that the remote broker failed. - discoveryAgent.serviceFailed(event); - } catch (IOException e) { - } - } - }; - }else { - result = new DurableConduitBridge(localTransport, remoteTransport) { - protected void serviceRemoteException(Exception error) { - super.serviceRemoteException(error); - try { - // Notify the discovery agent that the remote broker failed. - discoveryAgent.serviceFailed(event); - } catch (IOException e) { - } - } - }; - } - }else { - result = new DemandForwardingBridge(localTransport, remoteTransport) { - protected void serviceRemoteException(Exception error) { - super.serviceRemoteException(error); - try { - // Notify the discovery agent that the remote broker failed. - discoveryAgent.serviceFailed(event); - } catch (IOException e) { - } - } - }; - } - result.setLocalBrokerName(brokerName); + protected Bridge configureBridge(DemandForwardingBridgeSupport result) { + result.setLocalBrokerName(getBrokerName()); result.setName(getBrokerName()); result.setNetworkTTL(getNetworkTTL()); result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); - + List destsList = getDynamicallyIncludedDestinations(); - ActiveMQDestination dests[] = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); + ActiveMQDestination dests[] = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setDynamicallyIncludedDestinations(dests); - + destsList = getExcludedDestinations(); - dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); + dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setExcludedDestinations(dests); destsList = getStaticallyIncludedDestinations(); - dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); + dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setStaticallyIncludedDestinations(dests); - - if (durableDestinations != null){ + + if (durableDestinations != null) { ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()]; dest = (ActiveMQDestination[]) durableDestinations.toArray(dest); result.setDurableDestinations(dest); } return result; - } + } + protected abstract String createName(); + + protected void doStart() throws Exception { + if (localURI == null) { + throw new IllegalStateException("You must configure the 'localURI' property"); + } + log.info("Network Connector "+getName()+" Started"); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + log.info("Network Connector "+getName()+" Stopped"); + } + + protected Transport createLocalTransport() throws Exception { + return TransportFactory.connect(localURI); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java new file mode 100644 index 0000000000..5d1bdf9edb --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java @@ -0,0 +1,32 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +/** + * + * @version $Revision$ + */ +public class MulticastNetworkTest extends SimpleNetworkTest { + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/multicast/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/multicast/localBroker.xml"; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index a8a60f50fb..ac1dfd92bf 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -114,11 +114,11 @@ public class SimpleNetworkTest extends TestCase{ } protected void doSetUp() throws Exception{ - Resource resource=new ClassPathResource("org/apache/activemq/network/localBroker.xml"); + Resource resource=new ClassPathResource(getLocalBrokerURI()); BrokerFactoryBean factory=new BrokerFactoryBean(resource); factory.afterPropertiesSet(); localBroker=factory.getBroker(); - resource=new ClassPathResource("org/apache/activemq/network/remoteBroker.xml"); + resource=new ClassPathResource(getRemoteBrokerURI()); factory=new BrokerFactoryBean(resource); factory.afterPropertiesSet(); remoteBroker=factory.getBroker(); @@ -139,4 +139,12 @@ public class SimpleNetworkTest extends TestCase{ localSession=localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); remoteSession=remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); } + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker.xml"; + } } diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml new file mode 100644 index 0000000000..e97f64e03d --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml new file mode 100644 index 0000000000..4290345055 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + + +