Polish the code, remove warnings fix spelling errors in the docs, add some new javadocs, etc.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1391682 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-09-28 22:19:20 +00:00
parent 0a7c5dc93a
commit a80acbb1ad
17 changed files with 516 additions and 519 deletions

View File

@ -16,23 +16,17 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.io.IOException;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A demand forwarding bridge which works with multicast style transports where * A demand forwarding bridge which works with multicast style transports where
* a single Transport could be communicating with multiple remote brokers * a single Transport could be communicating with multiple remote brokers
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport { public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSupport {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDemandForwardingBridge.class);
public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) { Transport remoteBroker) {
@ -41,6 +35,5 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo
} }
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
// TODO is there much we can do here?
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -27,15 +28,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* implement conditional behaviour for queue consumers, * implement conditional behavior for queue consumers, allows replaying back to
* allows replaying back to origin if no consumers are present on the local broker * origin if no consumers are present on the local broker after a configurable
* after a configurable delay, irrespective of the networkTTL * delay, irrespective of the networkTTL Also allows rate limiting of messages
* Also allows rate limiting of messages through the network, useful for static includes * through the network, useful for static includes
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*/ */
public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory { public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
boolean replayWhenNoConsumers = false; boolean replayWhenNoConsumers = false;
int replayDelay = 0; int replayDelay = 0;
int rateLimit = 0; int rateLimit = 0;
@ -104,13 +105,15 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message); match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
if (match && LOG.isTraceEnabled()) { if (match && LOG.isTraceEnabled()) {
LOG.trace("Replaying [" + message.getMessageId() +"] for [" + message.getDestination() +"] back to origin in the absence of a local consumer"); LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin in the absence of a local consumer");
} }
} }
if (match && rateLimitExceeded()) { if (match && rateLimitExceeded()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount + ">" + rateLimit + "/" + rateDuration); LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+ ">" + rateLimit + "/" + rateDuration);
} }
match = false; match = false;
} }
@ -124,7 +127,7 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
} }
private boolean hasNotJustArrived(Message message) { private boolean hasNotJustArrived(Message message) {
return replayDelay ==0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
} }
private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) { private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
@ -132,7 +135,8 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
for (Subscription sub : consumers) { for (Subscription sub : consumers) {
if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) { if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination() +"] to origin due to existing local consumer: " + sub.getConsumerInfo()); LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] to origin due to existing local consumer: " + sub.getConsumerInfo());
} }
return false; return false;
} }

View File

@ -19,8 +19,8 @@ package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -31,15 +31,13 @@ import org.slf4j.LoggerFactory;
/** /**
* Consolidates subscriptions * Consolidates subscriptions
*
*
*/ */
public class ConduitBridge extends DemandForwardingBridge { public class ConduitBridge extends DemandForwardingBridge {
private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class); private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class);
/** /**
* Constructor * Constructor
* *
* @param localBroker * @param localBroker
* @param remoteBroker * @param remoteBroker
*/ */
@ -57,38 +55,39 @@ public class ConduitBridge extends DemandForwardingBridge {
info.setSelector(null); info.setSelector(null);
return doCreateDemandSubscription(info); return doCreateDemandSubscription(info);
} }
protected boolean checkPaths(BrokerId[] first, BrokerId[] second) { protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
if (first == null || second == null) if (first == null || second == null) {
return true; return true;
if (Arrays.equals(first, second)) }
return true; if (Arrays.equals(first, second)) {
if (first[0].equals(second[0]) return true;
&& first[first.length - 1].equals(second[second.length - 1])) }
return false;
else if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length - 1])) {
return true; return false;
} else {
return true;
}
} }
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
// search through existing subscriptions and see if we have a match // search through existing subscriptions and see if we have a match
boolean matched = false; boolean matched = false;
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next(); for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination()); DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (filter.matches(info.getDestination())) { if (filter.matches(info.getDestination())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo() LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " +
+ " with sub: " + info.getConsumerId()); ds.getRemoteInfo() + " with sub: " + info.getConsumerId());
} }
// add the interest in the subscription // add the interest in the subscription
// ds.add(ds.getRemoteInfo().getConsumerId());
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) { if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
ds.add(info.getConsumerId()); ds.add(info.getConsumerId());
} }
matched = true; matched = true;
// continue - we want interest to any existing // continue - we want interest to any existing DemandSubscriptions
// DemandSubscriptions
} }
} }
return matched; return matched;
@ -98,8 +97,7 @@ public class ConduitBridge extends DemandForwardingBridge {
protected void removeDemandSubscription(ConsumerId id) throws IOException { protected void removeDemandSubscription(ConsumerId id) throws IOException {
List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>(); List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DemandSubscription ds = (DemandSubscription)i.next();
if (ds.remove(id)) { if (ds.remove(id)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo()); LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
@ -109,14 +107,12 @@ public class ConduitBridge extends DemandForwardingBridge {
tmpList.add(ds); tmpList.add(ds);
} }
} }
for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
DemandSubscription ds = i.next(); for (DemandSubscription ds : tmpList) {
removeSubscription(ds); removeSubscription(ds);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
} }
} }
} }
} }

View File

@ -20,11 +20,12 @@ import java.net.URI;
/** /**
* Abstraction that allows you to control which brokers a NetworkConnector connects bridges to. * Abstraction that allows you to control which brokers a NetworkConnector connects bridges to.
*
*
*/ */
public interface ConnectionFilter { public interface ConnectionFilter {
/** /**
* Connect the filter to a given location.
*
* @param location * @param location
* @return true if the network connector should establish a connection to the specified location. * @return true if the network connector should establish a connection to the specified location.
*/ */

View File

@ -21,7 +21,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.NetworkBridgeFilter; import org.apache.activemq.command.NetworkBridgeFilter;
/** /**
* implement default behaviour, filter that will not allow resend to origin * implement default behavior, filter that will not allow re-send to origin
* based on brokerPath and which respects networkTTL * based on brokerPath and which respects networkTTL
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean

View File

@ -17,18 +17,14 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Forwards messages from the local broker to the remote broker based on demand. * Forwards messages from the local broker to the remote broker based on demand.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class DemandForwardingBridge extends DemandForwardingBridgeSupport { public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridge.class);
public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
Transport remoteBroker) { Transport remoteBroker) {

View File

@ -115,8 +115,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration; protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
@ -324,7 +324,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// determine demand. // determine demand.
if (!configuration.isStaticBridge()) { if (!configuration.isStaticBridge()) {
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
// always dispatch advisories async so that we never block the producer broker if we are slow // always dispatch advisory message asynchronously so that we never block the producer
// broker if we are slow
demandConsumerInfo.setDispatchAsync(true); demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter(); String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) { if (configuration.isBridgeTempDestinations()) {
@ -382,6 +383,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ss.throwFirstException(); ss.throwFirstException();
} }
} }
if (remoteBrokerInfo != null) { if (remoteBrokerInfo != null) {
brokerService.getBroker().removeBroker(null, remoteBrokerInfo); brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
@ -579,7 +581,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
// in a cyclic network there can be multiple bridges per broker that can propagate // in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronise on a shared entity // a network subscription so there is a need to synchronize on a shared entity
synchronized (brokerService.getVmConnectorURI()) { synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) { if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -592,8 +594,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
} else if (data.getClass() == DestinationInfo.class) { } else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up // It's a destination info - we want to pass up information about temporary destinations
// information about temporary destinations
DestinationInfo destInfo = (DestinationInfo) data; DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath(); BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) { if (path != null && path.length >= networkTTL) {
@ -603,8 +604,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return; return;
} }
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to // Ignore this consumer as it's a consumer we locally sent to the broker.
// the broker.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once"); LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
} }
@ -922,7 +922,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
// Are we not bridging temp destinations? // Are we not bridging temporary destinations?
if (destination.isTemporary()) { if (destination.isTemporary()) {
if (allowTemporary) { if (allowTemporary) {
return true; return true;
@ -1118,10 +1118,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
break; break;
} }
if ( region instanceof AbstractRegion ) if ( region instanceof AbstractRegion ) {
subs = ((AbstractRegion) region).getSubscriptions().values(); subs = ((AbstractRegion) region).getSubscriptions().values();
else } else {
subs = null; subs = null;
}
return subs; return subs;
} }
@ -1137,7 +1138,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
if (info.getDestination().isTemporary()) { if (info.getDestination().isTemporary()) {
// reset the local connection Id // reset the local connection Id
ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
} }
@ -1160,10 +1160,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
ConsumerInfo info = new ConsumerInfo(); ConsumerInfo info = new ConsumerInfo();
info.setDestination(destination); info.setDestination(destination);
// the remote info held by the DemandSubscription holds the original
// consumerId,
// the local info get's overwritten
// the remote info held by the DemandSubscription holds the original consumerId,
// the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null; DemandSubscription result = null;
try { try {

View File

@ -29,18 +29,16 @@ import org.slf4j.LoggerFactory;
/** /**
* Represents a network bridge interface * Represents a network bridge interface
*
*
*/ */
public class DemandSubscription { public class DemandSubscription {
private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
private final ConsumerInfo remoteInfo; private final ConsumerInfo remoteInfo;
private final ConsumerInfo localInfo; private final ConsumerInfo localInfo;
private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean();
private AtomicInteger dispatched = new AtomicInteger(0);
private AtomicBoolean activeWaiter = new AtomicBoolean();
private NetworkBridgeFilter networkBridgeFilter; private NetworkBridgeFilter networkBridgeFilter;
DemandSubscription(ConsumerInfo info) { DemandSubscription(ConsumerInfo info) {
@ -52,7 +50,7 @@ public class DemandSubscription {
/** /**
* Increment the consumers associated with this subscription * Increment the consumers associated with this subscription
* *
* @param id * @param id
* @return true if added * @return true if added
*/ */
@ -62,7 +60,7 @@ public class DemandSubscription {
/** /**
* Increment the consumers associated with this subscription * Increment the consumers associated with this subscription
* *
* @param id * @param id
* @return true if removed * @return true if removed
*/ */
@ -108,7 +106,8 @@ public class DemandSubscription {
} }
} }
if (this.dispatched.get() > 0) { if (this.dispatched.get() > 0) {
LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried"); LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, " +
"expect potentially " + this.dispatched.get() + " duplicate deliveried");
} }
} }
} }

View File

@ -19,15 +19,15 @@ package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
@ -36,26 +36,22 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
/** /**
* A network connector which uses a discovery agent to detect the remote brokers * A network connector which uses a discovery agent to detect the remote brokers
* available and setup a connection to each available remote broker * available and setup a connection to each available remote broker
* *
* @org.apache.xbean.XBean element="networkConnector" * @org.apache.xbean.XBean element="networkConnector"
* *
*/ */
public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class);
private DiscoveryAgent discoveryAgent; private DiscoveryAgent discoveryAgent;
private Map<String, String> parameters; private Map<String, String> parameters;
public DiscoveryNetworkConnector() { public DiscoveryNetworkConnector() {
} }
@ -71,8 +67,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
} }
} }
public void onServiceAdd(DiscoveryEvent event) { public void onServiceAdd(DiscoveryEvent event) {
@ -89,20 +84,27 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
return; return;
} }
// Should we try to connect to that URI? // Should we try to connect to that URI?
synchronized (bridges) { synchronized (bridges) {
if( bridges.containsKey(uri) ) { if( bridges.containsKey(uri) ) {
LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); if (LOG.isDebugEnabled()) {
LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri );
}
return; return;
} }
} }
if (localURI.equals(uri)) { if (localURI.equals(uri)) {
LOG.debug("not connecting loopback: " + uri); if (LOG.isDebugEnabled()) {
LOG.debug("not connecting loopback: " + uri);
}
return; return;
} }
if (connectionFilter != null && !connectionFilter.connectTo(uri)) { if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
LOG.debug("connectionFilter disallows connection to: " + uri); if (LOG.isDebugEnabled()) {
LOG.debug("connectionFilter disallows connection to: " + uri);
}
return; return;
} }
@ -112,7 +114,10 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
} }
LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
if (LOG.isInfoEnabled()) {
LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
}
Transport remoteTransport; Transport remoteTransport;
Transport localTransport; Transport localTransport;
@ -123,7 +128,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
remoteTransport = TransportFactory.connect(connectUri); remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e); if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure exception: " + e, e);
}
return; return;
} }
try { try {
@ -131,7 +138,9 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} catch (Exception e) { } catch (Exception e) {
ServiceSupport.dispose(remoteTransport); ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage()); LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e); if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure exception: " + e, e);
}
return; return;
} }
} finally { } finally {
@ -147,11 +156,15 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
ServiceSupport.dispose(localTransport); ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport); ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e); LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
LOG.debug("Start failure exception: " + e, e); if (LOG.isDebugEnabled()) {
LOG.debug("Start failure exception: " + e, e);
}
try { try {
discoveryAgent.serviceFailed(event); discoveryAgent.serviceFailed(event);
} catch (IOException e1) { } catch (IOException e1) {
LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); if (LOG.isDebugEnabled()) {
LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1);
}
} }
} }
} }
@ -168,9 +181,8 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return; return;
} }
NetworkBridge bridge;
synchronized (bridges) { synchronized (bridges) {
bridge = bridges.remove(uri); bridges.remove(uri);
} }
} }
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
@ -29,17 +28,15 @@ import org.slf4j.LoggerFactory;
/** /**
* Consolidates subscriptions * Consolidates subscriptions
*
*
*/ */
public class DurableConduitBridge extends ConduitBridge { public class DurableConduitBridge extends ConduitBridge {
private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
/** /**
* Constructor * Constructor
* *
* @param configuration * @param configuration
* *
* @param localBroker * @param localBroker
* @param remoteBroker * @param remoteBroker
*/ */
@ -50,14 +47,13 @@ public class DurableConduitBridge extends ConduitBridge {
/** /**
* Subscriptions for these destinations are always created * Subscriptions for these destinations are always created
* *
*/ */
protected void setupStaticDestinations() { protected void setupStaticDestinations() {
super.setupStaticDestinations(); super.setupStaticDestinations();
ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
if (dests != null) { if (dests != null) {
for (int i = 0; i < dests.length; i++) { for (ActiveMQDestination dest : dests) {
ActiveMQDestination dest = dests[i];
if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
DemandSubscription sub = createDemandSubscription(dest); DemandSubscription sub = createDemandSubscription(dest);
if (dest.isTopic()) { if (dest.isTopic()) {
@ -88,8 +84,8 @@ public class DurableConduitBridge extends ConduitBridge {
info.setSubscriptionName(getSubscriberName(info.getDestination())); info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't // and override the consumerId with something unique so that it won't
// be removed if the durable subscriber (at the other end) goes away // be removed if the durable subscriber (at the other end) goes away
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
.getNextSequenceId())); consumerIdGenerator.getNextSequenceId()));
} }
info.setSelector(null); info.setSelector(null);
return doCreateDemandSubscription(info); return doCreateDemandSubscription(info);
@ -102,8 +98,7 @@ public class DurableConduitBridge extends ConduitBridge {
protected boolean doesConsumerExist(ActiveMQDestination dest) { protected boolean doesConsumerExist(ActiveMQDestination dest) {
DestinationFilter filter = DestinationFilter.parseFilter(dest); DestinationFilter filter = DestinationFilter.parseFilter(dest);
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DemandSubscription ds = (DemandSubscription)i.next();
if (filter.matches(ds.getLocalInfo().getDestination())) { if (filter.matches(ds.getLocalInfo().getDestination())) {
return true; return true;
} }

View File

@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class ForwardingBridge implements Service { public class ForwardingBridge implements Service {
@ -162,8 +161,11 @@ public class ForwardingBridge implements Service {
topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
localBroker.oneway(topicConsumerInfo); localBroker.oneway(topicConsumerInfo);
} }
LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ " has been established."); if (LOG.isInfoEnabled()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ " has been established.");
}
} }
public void stop() throws Exception { public void stop() throws Exception {
@ -186,7 +188,9 @@ public class ForwardingBridge implements Service {
public void serviceRemoteException(Throwable error) { public void serviceRemoteException(Throwable error) {
LOG.info("Unexpected remote exception: " + error); LOG.info("Unexpected remote exception: " + error);
LOG.debug("Exception trace: ", error); if (LOG.isDebugEnabled()) {
LOG.debug("Exception trace: ", error);
}
} }
protected void serviceRemoteCommand(Command command) { protected void serviceRemoteCommand(Command command) {
@ -399,5 +403,4 @@ public class ForwardingBridge implements Service {
public boolean isUseCompression() { public boolean isUseCompression() {
return useCompression; return useCompression;
} }
} }

View File

@ -19,7 +19,6 @@ package org.apache.activemq.network;
import java.net.URI; import java.net.URI;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.naming.CommunicationException; import javax.naming.CommunicationException;
@ -42,415 +41,403 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* class to create dynamic network connectors listed in an directory * class to create dynamic network connectors listed in an directory server
* server using the LDAP v3 protocol as defined in RFC 2251, the * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
* entries listed in the directory server must implement the ipHost * directory server must implement the ipHost and ipService objectClasses as
* and ipService objectClasses as defined in RFC 2307. * defined in RFC 2307.
* *
* @author Trevor Pounds
* @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a> * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
* @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a> * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
* *
* @org.apache.xbean.XBean element="ldapNetworkConnector" * @org.apache.xbean.XBean element="ldapNetworkConnector"
*/ */
public class LdapNetworkConnector public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
extends NetworkConnector private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
implements NamespaceChangeListener,
ObjectChangeListener
{
private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
// force returned entries to implement the ipHost and ipService object classes (RFC 2307) // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
private static final String REQUIRED_OBJECT_CLASS_FILTER = "(&(objectClass=ipHost)(objectClass=ipService))"; private static final String REQUIRED_OBJECT_CLASS_FILTER =
"(&(objectClass=ipHost)(objectClass=ipService))";
// connection // connection
private URI[] availableURIs = null; private URI[] availableURIs = null;
private int availableURIsIndex = 0; private int availableURIsIndex = 0;
private String base = null; private String base = null;
private boolean failover = false; private boolean failover = false;
private long curReconnectDelay = 1000; /* 1 sec */ private long curReconnectDelay = 1000; /* 1 sec */
private long maxReconnectDelay = 30000; /* 30 sec */ private long maxReconnectDelay = 30000; /* 30 sec */
// authentication // authentication
private String user = null; private String user = null;
private String password = null; private String password = null;
private boolean anonymousAuthentication = false; private boolean anonymousAuthentication = false;
// search // search
private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
private boolean searchEventListener = false; private boolean searchEventListener = false;
// connector management // connector management
private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap(); private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
private Map<URI, Integer> referenceMap = new ConcurrentHashMap(); private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
private Map<String, URI> uuidMap = new ConcurrentHashMap(); private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
// local context // local context
private DirContext context = null; private DirContext context = null;
//currently in use URI // currently in use URI
private URI ldapURI = null; private URI ldapURI = null;
/** /**
* returns the next URI from the configured list * returns the next URI from the configured list
* *
* @return random URI from the configured list * @return random URI from the configured list
*/ */
public URI getUri() public URI getUri() {
{ return availableURIs[++availableURIsIndex % availableURIs.length]; } return availableURIs[++availableURIsIndex % availableURIs.length];
}
/** /**
* sets the LDAP server URI * sets the LDAP server URI
* *
* @param _uri LDAP server URI * @param _uri
*/ * LDAP server URI
public void setUri(URI _uri) */
throws Exception public void setUri(URI uri) throws Exception {
{ CompositeData data = URISupport.parseComposite(uri);
CompositeData data = URISupport.parseComposite(_uri); if (data.getScheme().equals("failover")) {
if(data.getScheme().equals("failover")) availableURIs = data.getComponents();
{ failover = true;
availableURIs = data.getComponents(); } else {
failover = true; availableURIs = new URI[] { uri };
} }
else }
{ availableURIs = new URI[]{ _uri }; }
}
/** /**
* sets the base LDAP dn used for lookup operations * sets the base LDAP dn used for lookup operations
* *
* @param _base LDAP base dn * @param _base
*/ * LDAP base dn
public void setBase(String _base) */
{ base = _base; } public void setBase(String base) {
this.base = base;
}
/** /**
* sets the LDAP user for access credentials * sets the LDAP user for access credentials
* *
* @param _user LDAP dn of user * @param _user
*/ * LDAP dn of user
public void setUser(String _user) */
{ user = _user; } public void setUser(String user) {
this.user = user;
}
/** /**
* sets the LDAP password for access credentials * sets the LDAP password for access credentials
* *
* @param _password user password * @param _password
*/ * user password
public void setPassword(String _password) */
{ password = _password; } public void setPassword(String password) {
this.password = password;
}
/** /**
* sets LDAP anonymous authentication access credentials * sets LDAP anonymous authentication access credentials
* *
* @param _anonymousAuthentication set to true to use anonymous authentication * @param _anonymousAuthentication
*/ * set to true to use anonymous authentication
public void setAnonymousAuthentication(boolean _anonymousAuthentication) */
{ anonymousAuthentication = _anonymousAuthentication; } public void setAnonymousAuthentication(boolean anonymousAuthentication) {
this.anonymousAuthentication = anonymousAuthentication;
}
/** /**
* sets the LDAP search scope * sets the LDAP search scope
* *
* @param _searchScope LDAP JNDI search scope * @param _searchScope
*/ * LDAP JNDI search scope
public void setSearchScope(String _searchScope) */
throws Exception public void setSearchScope(String searchScope) throws Exception {
{ int scope;
int scope; if (searchScope.equals("OBJECT_SCOPE")) {
if(_searchScope.equals("OBJECT_SCOPE")) scope = SearchControls.OBJECT_SCOPE;
{ scope = SearchControls.OBJECT_SCOPE; } } else if (searchScope.equals("ONELEVEL_SCOPE")) {
else if(_searchScope.equals("ONELEVEL_SCOPE")) scope = SearchControls.ONELEVEL_SCOPE;
{ scope = SearchControls.ONELEVEL_SCOPE; } } else if (searchScope.equals("SUBTREE_SCOPE")) {
else if(_searchScope.equals("SUBTREE_SCOPE")) scope = SearchControls.SUBTREE_SCOPE;
{ scope = SearchControls.SUBTREE_SCOPE; } } else {
else throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
{ throw new Exception("ERR: unknown LDAP search scope specified: " + _searchScope); } }
searchControls.setSearchScope(scope); searchControls.setSearchScope(scope);
} }
/** /**
* sets the LDAP search filter as defined in RFC 2254 * sets the LDAP search filter as defined in RFC 2254
* *
* @param _searchFilter LDAP search filter * @param _searchFilter
* @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a> * LDAP search filter
*/ * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
public void setSearchFilter(String _searchFilter) */
{ searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + _searchFilter + "))"; } public void setSearchFilter(String searchFilter) {
this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
}
/** /**
* enables/disable a persistent search to the LDAP server as defined * enables/disable a persistent search to the LDAP server as defined in
* in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
* *
* @param _searchEventListener enable = true, disable = false (default) * @param _searchEventListener
* @see <a href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a> * enable = true, disable = false (default)
*/ * @see <a
public void setSearchEventListener(boolean _searchEventListener) * href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
{ searchEventListener = _searchEventListener; } */
public void setSearchEventListener(boolean searchEventListener) {
this.searchEventListener = searchEventListener;
}
/** /**
* start the connector * start the connector
*/ */
public void start() public void start() throws Exception {
throws Exception LOG.info("connecting...");
{ Hashtable<String, String> env = new Hashtable<String, String>();
LOG.info("connecting..."); env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
Hashtable<String, String> env = new Hashtable(); this.ldapURI = getUri();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); LOG.debug(" URI [" + this.ldapURI + "]");
this.ldapURI = getUri(); env.put(Context.PROVIDER_URL, this.ldapURI.toString());
LOG.debug(" URI [" + this.ldapURI + "]"); if (anonymousAuthentication) {
env.put(Context.PROVIDER_URL, this.ldapURI.toString()); LOG.debug(" login credentials [anonymous]");
if(anonymousAuthentication) env.put(Context.SECURITY_AUTHENTICATION, "none");
{ } else {
LOG.debug(" login credentials [anonymous]"); LOG.debug(" login credentials [" + user + ":******]");
env.put(Context.SECURITY_AUTHENTICATION, "none"); env.put(Context.SECURITY_PRINCIPAL, user);
} env.put(Context.SECURITY_CREDENTIALS, password);
else }
{ boolean isConnected = false;
LOG.debug(" login credentials [" + user + ":******]"); while (!isConnected) {
env.put(Context.SECURITY_PRINCIPAL, user); try {
env.put(Context.SECURITY_CREDENTIALS, password); context = new InitialDirContext(env);
} isConnected = true;
boolean isConnected = false; } catch (CommunicationException err) {
while(!isConnected) if (failover) {
{ this.ldapURI = getUri();
try LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
{ env.put(Context.PROVIDER_URL, this.ldapURI.toString());
context = new InitialDirContext(env); Thread.sleep(curReconnectDelay);
isConnected = true; curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
} } else {
catch(CommunicationException err) throw err;
{ }
if(failover)
{
this.ldapURI = getUri();
LOG.error("connection error [" + env.get(Context.PROVIDER_URL) + "], failover connection to [" + this.ldapURI.toString() + "]");
env.put(Context.PROVIDER_URL, this.ldapURI.toString());
Thread.sleep(curReconnectDelay);
curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
} }
else }
{ throw err; }
}
}
// add connectors from search results // add connectors from search results
LOG.info("searching for network connectors..."); LOG.info("searching for network connectors...");
LOG.debug(" base [" + base + "]"); LOG.debug(" base [" + base + "]");
LOG.debug(" filter [" + searchFilter + "]"); LOG.debug(" filter [" + searchFilter + "]");
LOG.debug(" scope [" + searchControls.getSearchScope() + "]"); LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls); NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
while(results.hasMore()) while (results.hasMore()) {
{ addConnector(results.next()); } addConnector(results.next());
}
// register persistent search event listener // register persistent search event listener
if(searchEventListener) if (searchEventListener) {
{ LOG.info("registering persistent search listener...");
LOG.info("registering persistent search listener..."); EventDirContext eventContext = (EventDirContext) context.lookup("");
EventDirContext eventContext = (EventDirContext)context.lookup(""); eventContext.addNamingListener(base, searchFilter, searchControls, this);
eventContext.addNamingListener(base, searchFilter, searchControls, this); } else { // otherwise close context (i.e. connection as it is no longer needed)
} context.close();
else // otherwise close context (i.e. connection as it is no longer needed) }
{ context.close(); } }
}
/** /**
* stop the connector * stop the connector
*/ */
public void stop() public void stop() throws Exception {
throws Exception LOG.info("stopping context...");
{ for (NetworkConnector connector : connectorMap.values()) {
LOG.info("stopping context..."); connector.stop();
for(NetworkConnector connector : connectorMap.values()) }
{ connector.stop(); } connectorMap.clear();
connectorMap.clear(); referenceMap.clear();
referenceMap.clear(); uuidMap.clear();
uuidMap.clear(); context.close();
context.close(); }
}
public String toString() { public String toString() {
return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
} }
/** /**
* add connector of the given URI * add connector of the given URI
* *
* @param result * @param result
* search result of connector to add * search result of connector to add
*/ */
protected synchronized void addConnector(SearchResult result) protected synchronized void addConnector(SearchResult result) throws Exception {
throws Exception String uuid = toUUID(result);
{ if (uuidMap.containsKey(uuid)) {
String uuid = toUUID(result); LOG.warn("connector already regsitered for UUID [" + uuid + "]");
if(uuidMap.containsKey(uuid)) return;
{ }
LOG.warn("connector already regsitered for UUID [" + uuid + "]");
return;
}
URI connectorURI = toURI(result); URI connectorURI = toURI(result);
if(connectorMap.containsKey(connectorURI)) if (connectorMap.containsKey(connectorURI)) {
{ int referenceCount = referenceMap.get(connectorURI) + 1;
int referenceCount = referenceMap.get(connectorURI) + 1; LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
LOG.warn("connector reference added for URI [" + connectorURI + "], UUID [" + uuid + "], total reference(s) [" + referenceCount + "]"); referenceMap.put(connectorURI, referenceCount);
referenceMap.put(connectorURI, referenceCount); uuidMap.put(uuid, connectorURI);
uuidMap.put(uuid, connectorURI); return;
return; }
}
// FIXME: disable JMX listing of LDAP managed connectors, we will // FIXME: disable JMX listing of LDAP managed connectors, we will
// want to map/manage these differently in the future // want to map/manage these differently in the future
// boolean useJMX = getBrokerService().isUseJmx(); // boolean useJMX = getBrokerService().isUseJmx();
// getBrokerService().setUseJmx(false); // getBrokerService().setUseJmx(false);
NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
// getBrokerService().setUseJmx(useJMX); // getBrokerService().setUseJmx(useJMX);
// propogate std connector properties that may have been set via XML // Propagate standard connector properties that may have been set via XML
connector.setDynamicOnly(isDynamicOnly()); connector.setDynamicOnly(isDynamicOnly());
connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
connector.setNetworkTTL(getNetworkTTL()); connector.setNetworkTTL(getNetworkTTL());
connector.setConduitSubscriptions(isConduitSubscriptions()); connector.setConduitSubscriptions(isConduitSubscriptions());
connector.setExcludedDestinations(getExcludedDestinations()); connector.setExcludedDestinations(getExcludedDestinations());
connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
connector.setDuplex(isDuplex()); connector.setDuplex(isDuplex());
// XXX: set in the BrokerService.startAllConnectors method and is // XXX: set in the BrokerService.startAllConnectors method and is
// required to prevent remote broker exceptions upon connection // required to prevent remote broker exceptions upon connection
connector.setLocalUri(getBrokerService().getVmConnectorURI()); connector.setLocalUri(getBrokerService().getVmConnectorURI());
connector.setBrokerName(getBrokerService().getBrokerName()); connector.setBrokerName(getBrokerService().getBrokerName());
connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
// start network connector // start network connector
connectorMap.put(connectorURI, connector); connectorMap.put(connectorURI, connector);
referenceMap.put(connectorURI, 1); referenceMap.put(connectorURI, 1);
uuidMap.put(uuid, connectorURI); uuidMap.put(uuid, connectorURI);
connector.start(); connector.start();
LOG.info("connector added with URI [" + connectorURI + "]"); LOG.info("connector added with URI [" + connectorURI + "]");
} }
/** /**
* remove connector of the given URI * remove connector of the given URI
* *
* @param result search result of connector to remove * @param result
*/ * search result of connector to remove
protected synchronized void removeConnector(SearchResult result) */
throws Exception protected synchronized void removeConnector(SearchResult result) throws Exception {
{ String uuid = toUUID(result);
String uuid = toUUID(result); if (!uuidMap.containsKey(uuid)) {
if(!uuidMap.containsKey(uuid)) LOG.warn("connector not regsitered for UUID [" + uuid + "]");
{ return;
LOG.warn("connector not regsitered for UUID [" + uuid + "]"); }
return;
}
URI connectorURI = uuidMap.get(uuid); URI connectorURI = uuidMap.get(uuid);
if(!connectorMap.containsKey(connectorURI)) if (!connectorMap.containsKey(connectorURI)) {
{ LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
LOG.warn("connector not regisitered for URI [" + connectorURI + "]"); return;
return; }
}
int referenceCount = referenceMap.get(connectorURI) - 1; int referenceCount = referenceMap.get(connectorURI) - 1;
referenceMap.put(connectorURI, referenceCount); referenceMap.put(connectorURI, referenceCount);
uuidMap.remove(uuid); uuidMap.remove(uuid);
LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]"); LOG.debug("connector referenced removed for URI [" + connectorURI + "], UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
if(referenceCount > 0) if (referenceCount > 0) {
{ return; } return;
}
NetworkConnector connector = connectorMap.remove(connectorURI); NetworkConnector connector = connectorMap.remove(connectorURI);
connector.stop(); connector.stop();
LOG.info("connector removed with URI [" + connectorURI + "]"); LOG.info("connector removed with URI [" + connectorURI + "]");
} }
/** /**
* convert search result into URI * convert search result into URI
* *
* @param result search result to convert to URI * @param result
*/ * search result to convert to URI
protected URI toURI(SearchResult result) */
throws Exception protected URI toURI(SearchResult result) throws Exception {
{ Attributes attributes = result.getAttributes();
Attributes attributes = result.getAttributes(); String address = (String) attributes.get("iphostnumber").get();
String address = (String)attributes.get("iphostnumber").get(); String port = (String) attributes.get("ipserviceport").get();
String port = (String)attributes.get("ipserviceport").get(); String protocol = (String) attributes.get("ipserviceprotocol").get();
String protocol = (String)attributes.get("ipserviceprotocol").get(); URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]"); return connectorURI;
return connectorURI; }
}
/** /**
* convert search result into URI * convert search result into URI
* *
* @param result search result to convert to URI * @param result
*/ * search result to convert to URI
protected String toUUID(SearchResult result) */
{ protected String toUUID(SearchResult result) {
String uuid = result.getNameInNamespace(); String uuid = result.getNameInNamespace();
LOG.debug("retrieved UUID from SearchResult [" + uuid + "]"); LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
return uuid; return uuid;
} }
/** /**
* invoked when an entry has been added during a persistent search * invoked when an entry has been added during a persistent search
*/ */
public void objectAdded(NamingEvent event) public void objectAdded(NamingEvent event) {
{ LOG.debug("entry added");
LOG.debug("entry added"); try {
try addConnector((SearchResult) event.getNewBinding());
{ addConnector((SearchResult)event.getNewBinding()); } } catch (Exception err) {
catch(Exception err) LOG.error("ERR: caught unexpected exception", err);
{ LOG.error("ERR: caught unexpected exception", err); } }
} }
/** /**
* invoked when an entry has been removed during a persistent search * invoked when an entry has been removed during a persistent search
*/ */
public void objectRemoved(NamingEvent event) public void objectRemoved(NamingEvent event) {
{ LOG.debug("entry removed");
LOG.debug("entry removed"); try {
try removeConnector((SearchResult) event.getOldBinding());
{ removeConnector((SearchResult)event.getOldBinding()); } } catch (Exception err) {
catch(Exception err) LOG.error("ERR: caught unexpected exception", err);
{ LOG.error("ERR: caught unexpected exception", err); } }
} }
/** /**
* invoked when an entry has been renamed during a persistent search * invoked when an entry has been renamed during a persistent search
*/ */
public void objectRenamed(NamingEvent event) public void objectRenamed(NamingEvent event) {
{ LOG.debug("entry renamed");
LOG.debug("entry renamed"); // XXX: getNameInNamespace method does not seem to work properly,
// XXX: getNameInNamespace method does not seem to work properly, // but getName seems to provide the result we want
// but getName seems to provide the result we want String uuidOld = event.getOldBinding().getName();
String uuidOld = event.getOldBinding().getName(); String uuidNew = event.getNewBinding().getName();
String uuidNew = event.getNewBinding().getName(); URI connectorURI = uuidMap.remove(uuidOld);
URI connectorURI = uuidMap.remove(uuidOld); uuidMap.put(uuidNew, connectorURI);
uuidMap.put(uuidNew, connectorURI); LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
LOG.debug("connector reference renamed for URI [" + connectorURI + "], Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]"); }
}
/** /**
* invoked when an entry has been changed during a persistent search * invoked when an entry has been changed during a persistent search
*/ */
public void objectChanged(NamingEvent event) public void objectChanged(NamingEvent event) {
{ LOG.debug("entry changed");
LOG.debug("entry changed"); try {
try SearchResult result = (SearchResult) event.getNewBinding();
{ removeConnector(result);
SearchResult result = (SearchResult)event.getNewBinding(); addConnector(result);
removeConnector(result); } catch (Exception err) {
addConnector(result); LOG.error("ERR: caught unexpected exception", err);
} }
catch(Exception err) }
{ LOG.error("ERR: caught unexpected exception", err); }
}
/** /**
* invoked when an exception has occurred during a persistent search * invoked when an exception has occurred during a persistent search
*/ */
public void namingExceptionThrown(NamingExceptionEvent event) public void namingExceptionThrown(NamingExceptionEvent event) {
{ LOG.error("ERR: caught unexpected exception", event.getException()); } LOG.error("ERR: caught unexpected exception", event.getException());
}
} }

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.util.HashMap;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.NetworkBridgeView; import org.apache.activemq.broker.jmx.NetworkBridgeView;
@ -24,11 +30,6 @@ import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.HashMap;
import java.util.Map;
public class MBeanNetworkListener implements NetworkBridgeListener { public class MBeanNetworkListener implements NetworkBridgeListener {
private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class); private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class);
@ -44,7 +45,6 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
@Override @Override
public void bridgeFailed() { public void bridgeFailed() {
} }
@Override @Override
@ -81,7 +81,6 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
} }
} }
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList()); Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String) map.get("BrokerName")) + "," + "Type=NetworkBridge," return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String) map.get("BrokerName")) + "," + "Type=NetworkBridge,"

View File

@ -26,13 +26,12 @@ import org.apache.activemq.util.ServiceStopper;
* A network connector which uses some kind of multicast-like transport that * A network connector which uses some kind of multicast-like transport that
* communicates with potentially many remote brokers over a single logical * communicates with potentially many remote brokers over a single logical
* {@link Transport} instance such as when using multicast. * {@link Transport} instance such as when using multicast.
* *
* This implementation does not depend on multicast at all; any other group * This implementation does not depend on multicast at all; any other group
* based transport could be used. * based transport could be used.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class MulticastNetworkConnector extends NetworkConnector { public class MulticastNetworkConnector extends NetworkConnector {
@ -151,5 +150,4 @@ public class MulticastNetworkConnector extends NetworkConnector {
bridge.setBrokerService(getBrokerService()); bridge.setBrokerService(getBrokerService());
return bridge; return bridge;
} }
} }

View File

@ -17,48 +17,70 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.Service;
import org.apache.activemq.Service;
/** /**
* Represents a network bridge interface * Represents a network bridge interface
*
*
*/ */
public interface NetworkBridge extends Service { public interface NetworkBridge extends Service {
/** /**
* Service an exception * Service an exception received from the Remote Broker connection.
* @param error * @param error
*/ */
void serviceRemoteException(Throwable error); void serviceRemoteException(Throwable error);
/** /**
* servicee an exception * Service an exception received from the Local Broker connection.
* @param error * @param error
*/ */
void serviceLocalException(Throwable error); void serviceLocalException(Throwable error);
/** /**
* Set the NetworkBridgeFailedListener * Set the NetworkBridgeFailedListener
* @param listener * @param listener
*/ */
void setNetworkBridgeListener(NetworkBridgeListener listener); void setNetworkBridgeListener(NetworkBridgeListener listener);
/**
* @return the network address of the remote broker connection.
*/
String getRemoteAddress(); String getRemoteAddress();
/**
* @return the name of the remote broker this bridge is connected to.
*/
String getRemoteBrokerName(); String getRemoteBrokerName();
/**
* @return the network address of the local broker connection.
*/
String getLocalAddress(); String getLocalAddress();
/**
* @return the name of the local broker this bridge is connected to.
*/
String getLocalBrokerName(); String getLocalBrokerName();
/**
* @return the current number of enqueues this bridge has.
*/
long getEnqueueCounter(); long getEnqueueCounter();
/**
* @return the current number of dequeues this bridge has.
*/
long getDequeueCounter(); long getDequeueCounter();
/**
* @param objectName
* The ObjectName assigned to this bridge in the MBean server.
*/
void setMbeanObjectName(ObjectName objectName); void setMbeanObjectName(ObjectName objectName);
/**
* @return the MBean name used to identify this bridge in the MBean server.
*/
ObjectName getMbeanObjectName(); ObjectName getMbeanObjectName();
} }

View File

@ -24,8 +24,6 @@ import org.apache.activemq.command.ConsumerInfo;
/** /**
* Configuration for a NetworkBridge * Configuration for a NetworkBridge
*
*
*/ */
public class NetworkBridgeConfiguration { public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true; private boolean conduitSubscriptions = true;
@ -233,7 +231,7 @@ public class NetworkBridgeConfiguration {
} else { } else {
return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">";
} }
} else { } else {
// prepend consumer advisory prefix // prepend consumer advisory prefix
// to keep backward compatibility // to keep backward compatibility
if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX)) { if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX)) {
@ -292,8 +290,6 @@ public class NetworkBridgeConfiguration {
this.staticallyIncludedDestinations = staticallyIncludedDestinations; this.staticallyIncludedDestinations = staticallyIncludedDestinations;
} }
public boolean isSuppressDuplicateQueueSubscriptions() { public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions; return suppressDuplicateQueueSubscriptions;
} }

View File

@ -46,7 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* * Connector class for bridging broker networks.
*/ */
public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
@ -54,7 +54,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
protected URI localURI; protected URI localURI;
protected ConnectionFilter connectionFilter; protected ConnectionFilter connectionFilter;
protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>(); protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
protected ServiceSupport serviceSupport = new ServiceSupport() { protected ServiceSupport serviceSupport = new ServiceSupport() {
protected void doStart() throws Exception { protected void doStart() throws Exception {
@ -72,7 +72,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>(); private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private BrokerService brokerService; private BrokerService brokerService;
private ObjectName objectName; private ObjectName objectName;
public NetworkConnector() { public NetworkConnector() {
} }
@ -91,7 +91,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
/** /**
* @return Returns the durableDestinations. * @return Returns the durableDestinations.
*/ */
public Set getDurableDestinations() { public Set<ActiveMQDestination> getDurableDestinations() {
return durableDestinations; return durableDestinations;
} }
@ -179,14 +179,14 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests); result.setStaticallyIncludedDestinations(dests);
if (durableDestinations != null) { if (durableDestinations != null) {
HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
for (ActiveMQDestination d : durableDestinations) { for (ActiveMQDestination d : durableDestinations) {
if( d.isTopic() ) { if( d.isTopic() ) {
topics.add(d); topics.add(d);
} }
} }
ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
dest = (ActiveMQDestination[])topics.toArray(dest); dest = (ActiveMQDestination[])topics.toArray(dest);
result.setDurableDestinations(dest); result.setDurableDestinations(dest);
@ -218,7 +218,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
} }
public boolean isStarted() { public boolean isStarted() {
return serviceSupport.isStarted(); return serviceSupport.isStarted();
} }
public boolean isStopped() { public boolean isStopped() {
@ -269,9 +269,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
} }
} }
@SuppressWarnings("unchecked")
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
ObjectName connectorName = getObjectName(); ObjectName connectorName = getObjectName();
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList()); Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
@ -294,9 +292,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
} }
return removeSucceeded; return removeSucceeded;
} }
public Collection<NetworkBridge> activeBridges() { public Collection<NetworkBridge> activeBridges() {
return bridges.values(); return bridges.values();
} }
} }