diff --git a/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java index 5307015538..a52d457983 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java @@ -16,23 +16,17 @@ */ package org.apache.activemq.network; -import java.io.IOException; - import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A demand forwarding bridge which works with multicast style transports where * a single Transport could be communicating with multiple remote brokers - * + * * @org.apache.xbean.XBean - * - * + * */ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { - private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class); public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { @@ -41,6 +35,5 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo } protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { - // TODO is there much we can do here? } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java b/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java index 58c4b6b625..fbeaea8e96 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConditionalNetworkBridgeFilterFactory.java @@ -17,6 +17,7 @@ package org.apache.activemq.network; import java.util.List; + import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerInfo; @@ -27,15 +28,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * implement conditional behaviour for queue consumers, - * allows replaying back to origin if no consumers are present on the local broker - * after a configurable delay, irrespective of the networkTTL - * Also allows rate limiting of messages through the network, useful for static includes + * implement conditional behavior for queue consumers, allows replaying back to + * origin if no consumers are present on the local broker after a configurable + * delay, irrespective of the networkTTL Also allows rate limiting of messages + * through the network, useful for static includes * - * @org.apache.xbean.XBean + * @org.apache.xbean.XBean */ - public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { + boolean replayWhenNoConsumers = false; int replayDelay = 0; int rateLimit = 0; @@ -104,13 +105,15 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message); if (match && LOG.isTraceEnabled()) { - LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer"); + LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination() + + "] back to origin in the absence of a local consumer"); } } if (match && rateLimitExceeded()) { if (LOG.isTraceEnabled()) { - LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration); + LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + + ">" + rateLimit + "/" + rateDuration); } match = false; } @@ -124,7 +127,7 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte } private boolean hasNotJustArrived(Message message) { - return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); + return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); } private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) { @@ -132,7 +135,8 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte for (Subscription sub : consumers) { if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) { if (LOG.isTraceEnabled()) { - LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo()); + LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() + + "] to origin due to existing local consumer: " + sub.getConsumerInfo()); } return false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java index c66cc71b4d..63914614ee 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -19,8 +19,8 @@ package org.apache.activemq.network; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; + import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -31,15 +31,13 @@ import org.slf4j.LoggerFactory; /** * Consolidates subscriptions - * - * */ public class ConduitBridge extends DemandForwardingBridge { private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class); /** * Constructor - * + * * @param localBroker * @param remoteBroker */ @@ -57,38 +55,39 @@ public class ConduitBridge extends DemandForwardingBridge { info.setSelector(null); return doCreateDemandSubscription(info); } - + protected boolean checkPaths(BrokerId[] first, BrokerId[] second) { - if (first == null || second == null) - return true; - if (Arrays.equals(first, second)) - return true; - if (first[0].equals(second[0]) - && first[first.length - 1].equals(second[second.length - 1])) - return false; - else - return true; + if (first == null || second == null) { + return true; + } + if (Arrays.equals(first, second)) { + return true; + } + + if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length - 1])) { + return false; + } else { + return true; + } } protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { // search through existing subscriptions and see if we have a match boolean matched = false; - for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { - DemandSubscription ds = (DemandSubscription)i.next(); + + for (DemandSubscription ds : subscriptionMapByLocalId.values()) { DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination()); if (filter.matches(info.getDestination())) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo() - + " with sub: " + info.getConsumerId()); + LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + + ds.getRemoteInfo() + " with sub: " + info.getConsumerId()); } // add the interest in the subscription - // ds.add(ds.getRemoteInfo().getConsumerId()); if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) { - ds.add(info.getConsumerId()); + ds.add(info.getConsumerId()); } matched = true; - // continue - we want interest to any existing - // DemandSubscriptions + // continue - we want interest to any existing DemandSubscriptions } } return matched; @@ -98,8 +97,7 @@ public class ConduitBridge extends DemandForwardingBridge { protected void removeDemandSubscription(ConsumerId id) throws IOException { List tmpList = new ArrayList(); - for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { - DemandSubscription ds = (DemandSubscription)i.next(); + for (DemandSubscription ds : subscriptionMapByLocalId.values()) { if (ds.remove(id)) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo()); @@ -109,14 +107,12 @@ public class ConduitBridge extends DemandForwardingBridge { tmpList.add(ds); } } - for (Iterator i = tmpList.iterator(); i.hasNext();) { - DemandSubscription ds = i.next(); + + for (DemandSubscription ds : tmpList) { removeSubscription(ds); if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); } } - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java b/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java index 39c147ecbd..838703f9d7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConnectionFilter.java @@ -20,11 +20,12 @@ import java.net.URI; /** * Abstraction that allows you to control which brokers a NetworkConnector connects bridges to. - * - * */ public interface ConnectionFilter { + /** + * Connect the filter to a given location. + * * @param location * @return true if the network connector should establish a connection to the specified location. */ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java b/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java index d3c6c1c0af..0d0e0939a8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DefaultNetworkBridgeFilterFactory.java @@ -21,7 +21,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.NetworkBridgeFilter; /** - * implement default behaviour, filter that will not allow resend to origin + * implement default behavior, filter that will not allow re-send to origin * based on brokerPath and which respects networkTTL * * @org.apache.xbean.XBean diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java index 11682cbaf4..4178145033 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java @@ -17,18 +17,14 @@ package org.apache.activemq.network; import org.apache.activemq.transport.Transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Forwards messages from the local broker to the remote broker based on demand. - * + * * @org.apache.xbean.XBean - * - * + * */ public class DemandForwardingBridge extends DemandForwardingBridgeSupport { - private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class); public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 94c29a58b4..50d241ae9c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -115,8 +115,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; - protected CountDownLatch startedLatch = new CountDownLatch(2); - protected CountDownLatch localStartedLatch = new CountDownLatch(1); + protected final CountDownLatch startedLatch = new CountDownLatch(2); + protected final CountDownLatch localStartedLatch = new CountDownLatch(1); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected NetworkBridgeConfiguration configuration; protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); @@ -324,7 +324,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // determine demand. if (!configuration.isStaticBridge()) { demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); - // always dispatch advisories async so that we never block the producer broker if we are slow + // always dispatch advisory message asynchronously so that we never block the producer + // broker if we are slow demandConsumerInfo.setDispatchAsync(true); String advisoryTopic = configuration.getDestinationFilter(); if (configuration.isBridgeTempDestinations()) { @@ -382,6 +383,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ss.throwFirstException(); } } + if (remoteBrokerInfo != null) { brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); @@ -579,7 +581,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } // in a cyclic network there can be multiple bridges per broker that can propagate - // a network subscription so there is a need to synchronise on a shared entity + // a network subscription so there is a need to synchronize on a shared entity synchronized (brokerService.getVmConnectorURI()) { if (addConsumerInfo(info)) { if (LOG.isDebugEnabled()) { @@ -592,8 +594,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } else if (data.getClass() == DestinationInfo.class) { - // It's a destination info - we want to pass up - // information about temporary destinations + // It's a destination info - we want to pass up information about temporary destinations DestinationInfo destInfo = (DestinationInfo) data; BrokerId[] path = destInfo.getBrokerPath(); if (path != null && path.length >= networkTTL) { @@ -603,8 +604,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return; } if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { - // Ignore this consumer as it's a consumer we locally sent to - // the broker. + // Ignore this consumer as it's a consumer we locally sent to the broker. if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); } @@ -922,7 +922,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { - // Are we not bridging temp destinations? + // Are we not bridging temporary destinations? if (destination.isTemporary()) { if (allowTemporary) { return true; @@ -1118,10 +1118,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br break; } - if ( region instanceof AbstractRegion ) + if ( region instanceof AbstractRegion ) { subs = ((AbstractRegion) region).getSubscriptions().values(); - else + } else { subs = null; + } return subs; } @@ -1137,7 +1138,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); if (info.getDestination().isTemporary()) { // reset the local connection Id - ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); } @@ -1160,10 +1160,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { ConsumerInfo info = new ConsumerInfo(); info.setDestination(destination); - // the remote info held by the DemandSubscription holds the original - // consumerId, - // the local info get's overwritten + // the remote info held by the DemandSubscription holds the original consumerId, + // the local info get's overwritten info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); DemandSubscription result = null; try { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index da03e4f83b..e1000ac34c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -29,18 +29,16 @@ import org.slf4j.LoggerFactory; /** * Represents a network bridge interface - * - * */ public class DemandSubscription { private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); private final ConsumerInfo remoteInfo; private final ConsumerInfo localInfo; - private Set remoteSubsIds = new CopyOnWriteArraySet(); + private final Set remoteSubsIds = new CopyOnWriteArraySet(); + private final AtomicInteger dispatched = new AtomicInteger(0); + private final AtomicBoolean activeWaiter = new AtomicBoolean(); - private AtomicInteger dispatched = new AtomicInteger(0); - private AtomicBoolean activeWaiter = new AtomicBoolean(); private NetworkBridgeFilter networkBridgeFilter; DemandSubscription(ConsumerInfo info) { @@ -52,7 +50,7 @@ public class DemandSubscription { /** * Increment the consumers associated with this subscription - * + * * @param id * @return true if added */ @@ -62,7 +60,7 @@ public class DemandSubscription { /** * Increment the consumers associated with this subscription - * + * * @param id * @return true if removed */ @@ -108,7 +106,8 @@ public class DemandSubscription { } } if (this.dispatched.get() > 0) { - LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried"); + LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, " + + "expect potentially " + this.dispatched.get() + " duplicate deliveried"); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 6c0eb469f7..ec8751629e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -19,15 +19,15 @@ package org.apache.activemq.network; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import javax.management.ObjectName; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; @@ -36,26 +36,22 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.URISupport; -import org.apache.activemq.util.URISupport.CompositeData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.ObjectName; - /** * A network connector which uses a discovery agent to detect the remote brokers * available and setup a connection to each available remote broker - * + * * @org.apache.xbean.XBean element="networkConnector" - * + * */ public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); private DiscoveryAgent discoveryAgent; - private Map parameters; - + public DiscoveryNetworkConnector() { } @@ -71,8 +67,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); } catch (URISyntaxException e) { LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); - } - + } } public void onServiceAdd(DiscoveryEvent event) { @@ -89,20 +84,27 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); return; } + // Should we try to connect to that URI? synchronized (bridges) { if( bridges.containsKey(uri) ) { - LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); + if (LOG.isDebugEnabled()) { + LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); + } return; } } if (localURI.equals(uri)) { - LOG.debug("not connecting loopback: " + uri); + if (LOG.isDebugEnabled()) { + LOG.debug("not connecting loopback: " + uri); + } return; } if (connectionFilter != null && !connectionFilter.connectTo(uri)) { - LOG.debug("connectionFilter disallows connection to: " + uri); + if (LOG.isDebugEnabled()) { + LOG.debug("connectionFilter disallows connection to: " + uri); + } return; } @@ -112,7 +114,10 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco } catch (URISyntaxException e) { LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); } - LOG.info("Establishing network connection from " + localURI + " to " + connectUri); + + if (LOG.isInfoEnabled()) { + LOG.info("Establishing network connection from " + localURI + " to " + connectUri); + } Transport remoteTransport; Transport localTransport; @@ -123,7 +128,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco remoteTransport = TransportFactory.connect(connectUri); } catch (Exception e) { LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); - LOG.debug("Connection failure exception: " + e, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection failure exception: " + e, e); + } return; } try { @@ -131,7 +138,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco } catch (Exception e) { ServiceSupport.dispose(remoteTransport); LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage()); - LOG.debug("Connection failure exception: " + e, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection failure exception: " + e, e); + } return; } } finally { @@ -147,11 +156,15 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco ServiceSupport.dispose(localTransport); ServiceSupport.dispose(remoteTransport); LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e); - LOG.debug("Start failure exception: " + e, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Start failure exception: " + e, e); + } try { discoveryAgent.serviceFailed(event); } catch (IOException e1) { - LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); + if (LOG.isDebugEnabled()) { + LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); + } } } } @@ -168,9 +181,8 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco return; } - NetworkBridge bridge; synchronized (bridges) { - bridge = bridges.remove(uri); + bridges.remove(uri); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 54d8ebd23d..dda74f76eb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -17,7 +17,6 @@ package org.apache.activemq.network; import java.io.IOException; -import java.util.Iterator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; @@ -29,17 +28,15 @@ import org.slf4j.LoggerFactory; /** * Consolidates subscriptions - * - * */ public class DurableConduitBridge extends ConduitBridge { private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); /** * Constructor - * + * * @param configuration - * + * * @param localBroker * @param remoteBroker */ @@ -50,14 +47,13 @@ public class DurableConduitBridge extends ConduitBridge { /** * Subscriptions for these destinations are always created - * + * */ protected void setupStaticDestinations() { super.setupStaticDestinations(); ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; if (dests != null) { - for (int i = 0; i < dests.length; i++) { - ActiveMQDestination dest = dests[i]; + for (ActiveMQDestination dest : dests) { if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { DemandSubscription sub = createDemandSubscription(dest); if (dest.isTopic()) { @@ -88,8 +84,8 @@ public class DurableConduitBridge extends ConduitBridge { info.setSubscriptionName(getSubscriberName(info.getDestination())); // and override the consumerId with something unique so that it won't // be removed if the durable subscriber (at the other end) goes away - info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator - .getNextSequenceId())); + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), + consumerIdGenerator.getNextSequenceId())); } info.setSelector(null); return doCreateDemandSubscription(info); @@ -102,8 +98,7 @@ public class DurableConduitBridge extends ConduitBridge { protected boolean doesConsumerExist(ActiveMQDestination dest) { DestinationFilter filter = DestinationFilter.parseFilter(dest); - for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { - DemandSubscription ds = (DemandSubscription)i.next(); + for (DemandSubscription ds : subscriptionMapByLocalId.values()) { if (filter.matches(ds.getLocalInfo().getDestination())) { return true; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java index c5bca729a5..5f90e66167 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java @@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory; * * @org.apache.xbean.XBean * - * */ public class ForwardingBridge implements Service { @@ -162,8 +161,11 @@ public class ForwardingBridge implements Service { topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); localBroker.oneway(topicConsumerInfo); } - LOG.info("Network connection between " + localBroker + " and " + remoteBroker - + " has been established."); + + if (LOG.isInfoEnabled()) { + LOG.info("Network connection between " + localBroker + " and " + remoteBroker + + " has been established."); + } } public void stop() throws Exception { @@ -186,7 +188,9 @@ public class ForwardingBridge implements Service { public void serviceRemoteException(Throwable error) { LOG.info("Unexpected remote exception: " + error); - LOG.debug("Exception trace: ", error); + if (LOG.isDebugEnabled()) { + LOG.debug("Exception trace: ", error); + } } protected void serviceRemoteCommand(Command command) { @@ -399,5 +403,4 @@ public class ForwardingBridge implements Service { public boolean isUseCompression() { return useCompression; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java index cce0110ea6..9f83b9bd0e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java @@ -19,7 +19,6 @@ package org.apache.activemq.network; import java.net.URI; import java.util.Hashtable; import java.util.Map; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import javax.naming.CommunicationException; @@ -42,415 +41,403 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * class to create dynamic network connectors listed in an directory - * server using the LDAP v3 protocol as defined in RFC 2251, the - * entries listed in the directory server must implement the ipHost - * and ipService objectClasses as defined in RFC 2307. - * - * @author Trevor Pounds + * class to create dynamic network connectors listed in an directory server + * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the + * directory server must implement the ipHost and ipService objectClasses as + * defined in RFC 2307. + * * @see RFC 2251 * @see RFC 2307 * * @org.apache.xbean.XBean element="ldapNetworkConnector" */ -public class LdapNetworkConnector - extends NetworkConnector - implements NamespaceChangeListener, - ObjectChangeListener -{ - private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class); +public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener { + private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class); - // force returned entries to implement the ipHost and ipService object classes (RFC 2307) - private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))"; + // force returned entries to implement the ipHost and ipService object classes (RFC 2307) + private static final String REQUIRED_OBJECT_CLASS_FILTER = + "(&(objectClass=ipHost)(objectClass=ipService))"; - // connection - private URI[] availableURIs = null; - private int availableURIsIndex = 0; - private String base = null; - private boolean failover = false; - private long curReconnectDelay = 1000; /* 1 sec */ - private long maxReconnectDelay = 30000; /* 30 sec */ + // connection + private URI[] availableURIs = null; + private int availableURIsIndex = 0; + private String base = null; + private boolean failover = false; + private long curReconnectDelay = 1000; /* 1 sec */ + private long maxReconnectDelay = 30000; /* 30 sec */ - // authentication - private String user = null; - private String password = null; - private boolean anonymousAuthentication = false; + // authentication + private String user = null; + private String password = null; + private boolean anonymousAuthentication = false; - // search - private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); - private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; - private boolean searchEventListener = false; + // search + private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); + private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; + private boolean searchEventListener = false; - // connector management - private Map connectorMap = new ConcurrentHashMap(); - private Map referenceMap = new ConcurrentHashMap(); - private Map uuidMap = new ConcurrentHashMap(); + // connector management + private Map connectorMap = new ConcurrentHashMap(); + private Map referenceMap = new ConcurrentHashMap(); + private Map uuidMap = new ConcurrentHashMap(); - // local context - private DirContext context = null; - //currently in use URI - private URI ldapURI = null; + // local context + private DirContext context = null; + // currently in use URI + private URI ldapURI = null; - /** - * returns the next URI from the configured list - * - * @return random URI from the configured list - */ - public URI getUri() - { return availableURIs[++availableURIsIndex % availableURIs.length]; } + /** + * returns the next URI from the configured list + * + * @return random URI from the configured list + */ + public URI getUri() { + return availableURIs[++availableURIsIndex % availableURIs.length]; + } - /** - * sets the LDAP server URI - * - * @param _uri LDAP server URI - */ - public void setUri(URI _uri) - throws Exception - { - CompositeData data = URISupport.parseComposite(_uri); - if(data.getScheme().equals("failover")) - { - availableURIs = data.getComponents(); - failover = true; - } - else - { availableURIs = new URI[]{ _uri }; } - } + /** + * sets the LDAP server URI + * + * @param _uri + * LDAP server URI + */ + public void setUri(URI uri) throws Exception { + CompositeData data = URISupport.parseComposite(uri); + if (data.getScheme().equals("failover")) { + availableURIs = data.getComponents(); + failover = true; + } else { + availableURIs = new URI[] { uri }; + } + } - /** - * sets the base LDAP dn used for lookup operations - * - * @param _base LDAP base dn - */ - public void setBase(String _base) - { base = _base; } + /** + * sets the base LDAP dn used for lookup operations + * + * @param _base + * LDAP base dn + */ + public void setBase(String base) { + this.base = base; + } - /** - * sets the LDAP user for access credentials - * - * @param _user LDAP dn of user - */ - public void setUser(String _user) - { user = _user; } + /** + * sets the LDAP user for access credentials + * + * @param _user + * LDAP dn of user + */ + public void setUser(String user) { + this.user = user; + } - /** - * sets the LDAP password for access credentials - * - * @param _password user password - */ - public void setPassword(String _password) - { password = _password; } + /** + * sets the LDAP password for access credentials + * + * @param _password + * user password + */ + public void setPassword(String password) { + this.password = password; + } - /** - * sets LDAP anonymous authentication access credentials - * - * @param _anonymousAuthentication set to true to use anonymous authentication - */ - public void setAnonymousAuthentication(boolean _anonymousAuthentication) - { anonymousAuthentication = _anonymousAuthentication; } + /** + * sets LDAP anonymous authentication access credentials + * + * @param _anonymousAuthentication + * set to true to use anonymous authentication + */ + public void setAnonymousAuthentication(boolean anonymousAuthentication) { + this.anonymousAuthentication = anonymousAuthentication; + } - /** - * sets the LDAP search scope - * - * @param _searchScope LDAP JNDI search scope - */ - public void setSearchScope(String _searchScope) - throws Exception - { - int scope; - if(_searchScope.equals("OBJECT_SCOPE")) - { scope = SearchControls.OBJECT_SCOPE; } - else if(_searchScope.equals("ONELEVEL_SCOPE")) - { scope = SearchControls.ONELEVEL_SCOPE; } - else if(_searchScope.equals("SUBTREE_SCOPE")) - { scope = SearchControls.SUBTREE_SCOPE; } - else - { throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); } - searchControls.setSearchScope(scope); - } + /** + * sets the LDAP search scope + * + * @param _searchScope + * LDAP JNDI search scope + */ + public void setSearchScope(String searchScope) throws Exception { + int scope; + if (searchScope.equals("OBJECT_SCOPE")) { + scope = SearchControls.OBJECT_SCOPE; + } else if (searchScope.equals("ONELEVEL_SCOPE")) { + scope = SearchControls.ONELEVEL_SCOPE; + } else if (searchScope.equals("SUBTREE_SCOPE")) { + scope = SearchControls.SUBTREE_SCOPE; + } else { + throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope); + } + searchControls.setSearchScope(scope); + } - /** - * sets the LDAP search filter as defined in RFC 2254 - * - * @param _searchFilter LDAP search filter - * @see RFC 2254 - */ - public void setSearchFilter(String _searchFilter) - { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; } + /** + * sets the LDAP search filter as defined in RFC 2254 + * + * @param _searchFilter + * LDAP search filter + * @see RFC 2254 + */ + public void setSearchFilter(String searchFilter) { + this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))"; + } - /** - * enables/disable a persistent search to the LDAP server as defined - * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) - * - * @param _searchEventListener enable = true, disable = false (default) - * @see draft-ietf-ldapext-psearch-03.txt - */ - public void setSearchEventListener(boolean _searchEventListener) - { searchEventListener = _searchEventListener; } + /** + * enables/disable a persistent search to the LDAP server as defined in + * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) + * + * @param _searchEventListener + * enable = true, disable = false (default) + * @see draft-ietf-ldapext-psearch-03.txt + */ + public void setSearchEventListener(boolean searchEventListener) { + this.searchEventListener = searchEventListener; + } - /** - * start the connector - */ - public void start() - throws Exception - { - LOG.info("connecting..."); - Hashtable env = new Hashtable(); - env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); - this.ldapURI = getUri(); - LOG.debug(" URI [" + this.ldapURI + "]"); - env.put(Context.PROVIDER_URL, this.ldapURI.toString()); - if(anonymousAuthentication) - { - LOG.debug(" login credentials [anonymous]"); - env.put(Context.SECURITY_AUTHENTICATION, "none"); - } - else - { - LOG.debug(" login credentials [" + user + ":******]"); - env.put(Context.SECURITY_PRINCIPAL, user); - env.put(Context.SECURITY_CREDENTIALS, password); - } - boolean isConnected = false; - while(!isConnected) - { - try - { - context = new InitialDirContext(env); - isConnected = true; - } - catch(CommunicationException err) - { - if(failover) - { - this.ldapURI = getUri(); - LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]"); - env.put(Context.PROVIDER_URL, this.ldapURI.toString()); - Thread.sleep(curReconnectDelay); - curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay); + /** + * start the connector + */ + public void start() throws Exception { + LOG.info("connecting..."); + Hashtable env = new Hashtable(); + env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); + this.ldapURI = getUri(); + LOG.debug(" URI [" + this.ldapURI + "]"); + env.put(Context.PROVIDER_URL, this.ldapURI.toString()); + if (anonymousAuthentication) { + LOG.debug(" login credentials [anonymous]"); + env.put(Context.SECURITY_AUTHENTICATION, "none"); + } else { + LOG.debug(" login credentials [" + user + ":******]"); + env.put(Context.SECURITY_PRINCIPAL, user); + env.put(Context.SECURITY_CREDENTIALS, password); + } + boolean isConnected = false; + while (!isConnected) { + try { + context = new InitialDirContext(env); + isConnected = true; + } catch (CommunicationException err) { + if (failover) { + this.ldapURI = getUri(); + LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]"); + env.put(Context.PROVIDER_URL, this.ldapURI.toString()); + Thread.sleep(curReconnectDelay); + curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay); + } else { + throw err; + } } - else - { throw err; } - } - } + } - // add connectors from search results - LOG.info("searching for network connectors..."); - LOG.debug(" base [" + base + "]"); - LOG.debug(" filter [" + searchFilter + "]"); - LOG.debug(" scope [" + searchControls.getSearchScope() + "]"); - NamingEnumeration results = context.search(base, searchFilter, searchControls); - while(results.hasMore()) - { addConnector(results.next()); } + // add connectors from search results + LOG.info("searching for network connectors..."); + LOG.debug(" base [" + base + "]"); + LOG.debug(" filter [" + searchFilter + "]"); + LOG.debug(" scope [" + searchControls.getSearchScope() + "]"); + NamingEnumeration results = context.search(base, searchFilter, searchControls); + while (results.hasMore()) { + addConnector(results.next()); + } - // register persistent search event listener - if(searchEventListener) - { - LOG.info("registering persistent search listener..."); - EventDirContext eventContext = (EventDirContext)context.lookup(""); - eventContext.addNamingListener(base, searchFilter, searchControls, this); - } - else // otherwise close context (i.e. connection as it is no longer needed) - { context.close(); } - } + // register persistent search event listener + if (searchEventListener) { + LOG.info("registering persistent search listener..."); + EventDirContext eventContext = (EventDirContext) context.lookup(""); + eventContext.addNamingListener(base, searchFilter, searchControls, this); + } else { // otherwise close context (i.e. connection as it is no longer needed) + context.close(); + } + } - /** - * stop the connector - */ - public void stop() - throws Exception - { - LOG.info("stopping context..."); - for(NetworkConnector connector : connectorMap.values()) - { connector.stop(); } - connectorMap.clear(); - referenceMap.clear(); - uuidMap.clear(); - context.close(); - } + /** + * stop the connector + */ + public void stop() throws Exception { + LOG.info("stopping context..."); + for (NetworkConnector connector : connectorMap.values()) { + connector.stop(); + } + connectorMap.clear(); + referenceMap.clear(); + uuidMap.clear(); + context.close(); + } - public String toString() { - return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; - } + public String toString() { + return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; + } - /** + /** * add connector of the given URI - * + * * @param result * search result of connector to add */ - protected synchronized void addConnector(SearchResult result) - throws Exception - { - String uuid = toUUID(result); - if(uuidMap.containsKey(uuid)) - { - LOG.warn("connector already regsitered for UUID [" + uuid + "]"); - return; - } + protected synchronized void addConnector(SearchResult result) throws Exception { + String uuid = toUUID(result); + if (uuidMap.containsKey(uuid)) { + LOG.warn("connector already regsitered for UUID [" + uuid + "]"); + return; + } - URI connectorURI = toURI(result); - if(connectorMap.containsKey(connectorURI)) - { - int referenceCount = referenceMap.get(connectorURI) + 1; - LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]"); - referenceMap.put(connectorURI, referenceCount); - uuidMap.put(uuid, connectorURI); - return; - } + URI connectorURI = toURI(result); + if (connectorMap.containsKey(connectorURI)) { + int referenceCount = referenceMap.get(connectorURI) + 1; + LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]"); + referenceMap.put(connectorURI, referenceCount); + uuidMap.put(uuid, connectorURI); + return; + } - // FIXME: disable JMX listing of LDAP managed connectors, we will - // want to map/manage these differently in the future -// boolean useJMX = getBrokerService().isUseJmx(); -// getBrokerService().setUseJmx(false); - NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); -// getBrokerService().setUseJmx(useJMX); + // FIXME: disable JMX listing of LDAP managed connectors, we will + // want to map/manage these differently in the future + // boolean useJMX = getBrokerService().isUseJmx(); + // getBrokerService().setUseJmx(false); + NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); + // getBrokerService().setUseJmx(useJMX); - // propogate std connector properties that may have been set via XML - connector.setDynamicOnly(isDynamicOnly()); - connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); - connector.setNetworkTTL(getNetworkTTL()); - connector.setConduitSubscriptions(isConduitSubscriptions()); - connector.setExcludedDestinations(getExcludedDestinations()); - connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); - connector.setDuplex(isDuplex()); + // Propagate standard connector properties that may have been set via XML + connector.setDynamicOnly(isDynamicOnly()); + connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); + connector.setNetworkTTL(getNetworkTTL()); + connector.setConduitSubscriptions(isConduitSubscriptions()); + connector.setExcludedDestinations(getExcludedDestinations()); + connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); + connector.setDuplex(isDuplex()); - // XXX: set in the BrokerService.startAllConnectors method and is - // required to prevent remote broker exceptions upon connection - connector.setLocalUri(getBrokerService().getVmConnectorURI()); - connector.setBrokerName(getBrokerService().getBrokerName()); - connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); + // XXX: set in the BrokerService.startAllConnectors method and is + // required to prevent remote broker exceptions upon connection + connector.setLocalUri(getBrokerService().getVmConnectorURI()); + connector.setBrokerName(getBrokerService().getBrokerName()); + connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); - // start network connector - connectorMap.put(connectorURI, connector); - referenceMap.put(connectorURI, 1); - uuidMap.put(uuid, connectorURI); - connector.start(); - LOG.info("connector added with URI [" + connectorURI + "]"); - } + // start network connector + connectorMap.put(connectorURI, connector); + referenceMap.put(connectorURI, 1); + uuidMap.put(uuid, connectorURI); + connector.start(); + LOG.info("connector added with URI [" + connectorURI + "]"); + } - /** - * remove connector of the given URI - * - * @param result search result of connector to remove - */ - protected synchronized void removeConnector(SearchResult result) - throws Exception - { - String uuid = toUUID(result); - if(!uuidMap.containsKey(uuid)) - { - LOG.warn("connector not regsitered for UUID [" + uuid + "]"); - return; - } + /** + * remove connector of the given URI + * + * @param result + * search result of connector to remove + */ + protected synchronized void removeConnector(SearchResult result) throws Exception { + String uuid = toUUID(result); + if (!uuidMap.containsKey(uuid)) { + LOG.warn("connector not regsitered for UUID [" + uuid + "]"); + return; + } - URI connectorURI = uuidMap.get(uuid); - if(!connectorMap.containsKey(connectorURI)) - { - LOG.warn("connector not regisitered for URI [" + connectorURI + "]"); - return; - } + URI connectorURI = uuidMap.get(uuid); + if (!connectorMap.containsKey(connectorURI)) { + LOG.warn("connector not regisitered for URI [" + connectorURI + "]"); + return; + } - int referenceCount = referenceMap.get(connectorURI) - 1; - referenceMap.put(connectorURI, referenceCount); - uuidMap.remove(uuid); - LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]"); + int referenceCount = referenceMap.get(connectorURI) - 1; + referenceMap.put(connectorURI, referenceCount); + uuidMap.remove(uuid); + LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]"); - if(referenceCount > 0) - { return; } + if (referenceCount > 0) { + return; + } - NetworkConnector connector = connectorMap.remove(connectorURI); - connector.stop(); - LOG.info("connector removed with URI [" + connectorURI + "]"); - } + NetworkConnector connector = connectorMap.remove(connectorURI); + connector.stop(); + LOG.info("connector removed with URI [" + connectorURI + "]"); + } - /** - * convert search result into URI - * - * @param result search result to convert to URI - */ - protected URI toURI(SearchResult result) - throws Exception - { - Attributes attributes = result.getAttributes(); - String address = (String)attributes.get("iphostnumber").get(); - String port = (String)attributes.get("ipserviceport").get(); - String protocol = (String)attributes.get("ipserviceprotocol").get(); - URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); - LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]"); - return connectorURI; - } + /** + * convert search result into URI + * + * @param result + * search result to convert to URI + */ + protected URI toURI(SearchResult result) throws Exception { + Attributes attributes = result.getAttributes(); + String address = (String) attributes.get("iphostnumber").get(); + String port = (String) attributes.get("ipserviceport").get(); + String protocol = (String) attributes.get("ipserviceprotocol").get(); + URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); + LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]"); + return connectorURI; + } - /** - * convert search result into URI - * - * @param result search result to convert to URI - */ - protected String toUUID(SearchResult result) - { - String uuid = result.getNameInNamespace(); - LOG.debug("retrieved UUID from SearchResult [" + uuid + "]"); - return uuid; - } + /** + * convert search result into URI + * + * @param result + * search result to convert to URI + */ + protected String toUUID(SearchResult result) { + String uuid = result.getNameInNamespace(); + LOG.debug("retrieved UUID from SearchResult [" + uuid + "]"); + return uuid; + } - /** - * invoked when an entry has been added during a persistent search - */ - public void objectAdded(NamingEvent event) - { - LOG.debug("entry added"); - try - { addConnector((SearchResult)event.getNewBinding()); } - catch(Exception err) - { LOG.error("ERR: caught unexpected exception", err); } - } + /** + * invoked when an entry has been added during a persistent search + */ + public void objectAdded(NamingEvent event) { + LOG.debug("entry added"); + try { + addConnector((SearchResult) event.getNewBinding()); + } catch (Exception err) { + LOG.error("ERR: caught unexpected exception", err); + } + } - /** - * invoked when an entry has been removed during a persistent search - */ - public void objectRemoved(NamingEvent event) - { - LOG.debug("entry removed"); - try - { removeConnector((SearchResult)event.getOldBinding()); } - catch(Exception err) - { LOG.error("ERR: caught unexpected exception", err); } - } + /** + * invoked when an entry has been removed during a persistent search + */ + public void objectRemoved(NamingEvent event) { + LOG.debug("entry removed"); + try { + removeConnector((SearchResult) event.getOldBinding()); + } catch (Exception err) { + LOG.error("ERR: caught unexpected exception", err); + } + } - /** - * invoked when an entry has been renamed during a persistent search - */ - public void objectRenamed(NamingEvent event) - { - LOG.debug("entry renamed"); - // XXX: getNameInNamespace method does not seem to work properly, - // but getName seems to provide the result we want - String uuidOld = event.getOldBinding().getName(); - String uuidNew = event.getNewBinding().getName(); - URI connectorURI = uuidMap.remove(uuidOld); - uuidMap.put(uuidNew, connectorURI); - LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]"); - } + /** + * invoked when an entry has been renamed during a persistent search + */ + public void objectRenamed(NamingEvent event) { + LOG.debug("entry renamed"); + // XXX: getNameInNamespace method does not seem to work properly, + // but getName seems to provide the result we want + String uuidOld = event.getOldBinding().getName(); + String uuidNew = event.getNewBinding().getName(); + URI connectorURI = uuidMap.remove(uuidOld); + uuidMap.put(uuidNew, connectorURI); + LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]"); + } - /** - * invoked when an entry has been changed during a persistent search - */ - public void objectChanged(NamingEvent event) - { - LOG.debug("entry changed"); - try - { - SearchResult result = (SearchResult)event.getNewBinding(); - removeConnector(result); - addConnector(result); - } - catch(Exception err) - { LOG.error("ERR: caught unexpected exception", err); } - } + /** + * invoked when an entry has been changed during a persistent search + */ + public void objectChanged(NamingEvent event) { + LOG.debug("entry changed"); + try { + SearchResult result = (SearchResult) event.getNewBinding(); + removeConnector(result); + addConnector(result); + } catch (Exception err) { + LOG.error("ERR: caught unexpected exception", err); + } + } - /** - * invoked when an exception has occurred during a persistent search - */ - public void namingExceptionThrown(NamingExceptionEvent event) - { LOG.error("ERR: caught unexpected exception", event.getException()); } + /** + * invoked when an exception has occurred during a persistent search + */ + public void namingExceptionThrown(NamingExceptionEvent event) { + LOG.error("ERR: caught unexpected exception", event.getException()); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java b/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java index 4cc3de008f..d7ea253b74 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.network; +import java.util.HashMap; +import java.util.Map; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.NetworkBridgeView; @@ -24,11 +30,6 @@ import org.apache.activemq.util.JMXSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import java.util.HashMap; -import java.util.Map; - public class MBeanNetworkListener implements NetworkBridgeListener { private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class); @@ -44,7 +45,6 @@ public class MBeanNetworkListener implements NetworkBridgeListener { @Override public void bridgeFailed() { - } @Override @@ -81,7 +81,6 @@ public class MBeanNetworkListener implements NetworkBridgeListener { } } - protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { Map map = new HashMap(connectorName.getKeyPropertyList()); return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String) map.get("BrokerName")) + "," + "Type=NetworkBridge," diff --git a/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java index 1498e2c4b6..ad7ac5babe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java @@ -26,13 +26,12 @@ import org.apache.activemq.util.ServiceStopper; * A network connector which uses some kind of multicast-like transport that * communicates with potentially many remote brokers over a single logical * {@link Transport} instance such as when using multicast. - * + * * This implementation does not depend on multicast at all; any other group * based transport could be used. - * + * * @org.apache.xbean.XBean - * - * + * */ public class MulticastNetworkConnector extends NetworkConnector { @@ -151,5 +150,4 @@ public class MulticastNetworkConnector extends NetworkConnector { bridge.setBrokerService(getBrokerService()); return bridge; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java index 49d998ab82..9ba3c9fd2f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridge.java @@ -17,48 +17,70 @@ package org.apache.activemq.network; import javax.management.ObjectName; -import org.apache.activemq.Service; +import org.apache.activemq.Service; /** * Represents a network bridge interface - * - * */ public interface NetworkBridge extends Service { - + /** - * Service an exception + * Service an exception received from the Remote Broker connection. * @param error */ void serviceRemoteException(Throwable error); - + /** - * servicee an exception + * Service an exception received from the Local Broker connection. * @param error */ void serviceLocalException(Throwable error); - + /** * Set the NetworkBridgeFailedListener * @param listener */ void setNetworkBridgeListener(NetworkBridgeListener listener); - - + + /** + * @return the network address of the remote broker connection. + */ String getRemoteAddress(); + /** + * @return the name of the remote broker this bridge is connected to. + */ String getRemoteBrokerName(); + /** + * @return the network address of the local broker connection. + */ String getLocalAddress(); + /** + * @return the name of the local broker this bridge is connected to. + */ String getLocalBrokerName(); + /** + * @return the current number of enqueues this bridge has. + */ long getEnqueueCounter(); + /** + * @return the current number of dequeues this bridge has. + */ long getDequeueCounter(); + /** + * @param objectName + * The ObjectName assigned to this bridge in the MBean server. + */ void setMbeanObjectName(ObjectName objectName); + /** + * @return the MBean name used to identify this bridge in the MBean server. + */ ObjectName getMbeanObjectName(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index a4288a0c3b..dde53148d9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -24,8 +24,6 @@ import org.apache.activemq.command.ConsumerInfo; /** * Configuration for a NetworkBridge - * - * */ public class NetworkBridgeConfiguration { private boolean conduitSubscriptions = true; @@ -233,7 +231,7 @@ public class NetworkBridgeConfiguration { } else { return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; } - } else { + } else { // prepend consumer advisory prefix // to keep backward compatibility if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX)) { @@ -292,8 +290,6 @@ public class NetworkBridgeConfiguration { this.staticallyIncludedDestinations = staticallyIncludedDestinations; } - - public boolean isSuppressDuplicateQueueSubscriptions() { return suppressDuplicateQueueSubscriptions; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index cd5486e4d5..34e75a112e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -46,7 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Connector class for bridging broker networks. */ public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { @@ -54,7 +54,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem protected URI localURI; protected ConnectionFilter connectionFilter; protected ConcurrentHashMap bridges = new ConcurrentHashMap(); - + protected ServiceSupport serviceSupport = new ServiceSupport() { protected void doStart() throws Exception { @@ -72,7 +72,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); private BrokerService brokerService; private ObjectName objectName; - + public NetworkConnector() { } @@ -91,7 +91,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem /** * @return Returns the durableDestinations. */ - public Set getDurableDestinations() { + public Set getDurableDestinations() { return durableDestinations; } @@ -179,14 +179,14 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setStaticallyIncludedDestinations(dests); if (durableDestinations != null) { - + HashSet topics = new HashSet(); for (ActiveMQDestination d : durableDestinations) { if( d.isTopic() ) { topics.add(d); } } - + ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; dest = (ActiveMQDestination[])topics.toArray(dest); result.setDurableDestinations(dest); @@ -218,7 +218,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem } public boolean isStarted() { - return serviceSupport.isStarted(); + return serviceSupport.isStarted(); } public boolean isStopped() { @@ -269,9 +269,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); } } - - @SuppressWarnings("unchecked") protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { ObjectName connectorName = getObjectName(); Map map = new HashMap(connectorName.getKeyPropertyList()); @@ -294,9 +292,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem } return removeSucceeded; } - + public Collection activeBridges() { return bridges.values(); } - }