Some additional code cleanup, doc fixes, etc.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-29 22:14:10 +00:00
parent e35c39bbcf
commit 5fc2535edf
1 changed files with 225 additions and 218 deletions

View File

@ -127,7 +127,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected NetworkBridgeConfiguration configuration; protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected Object brokerInfoMutex = new Object(); protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId; protected BrokerId remoteBrokerId;
@ -224,7 +224,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.warn("Caught exception from remote start", e); LOG.warn("Caught exception from remote start", e);
} }
} else { } else {
LOG.warn ("Bridge was disposed before the start() method was fully executed."); LOG.warn("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException(); throw new TransportDisposedIOException();
} }
} }
@ -288,16 +288,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// sync requests that may fail // sync requests that may fail
Object resp = localBroker.request(localConnectionInfo); Object resp = localBroker.request(localConnectionInfo);
if (resp instanceof ExceptionResponse) { if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse)resp).getException(); throw ((ExceptionResponse) resp).getException();
} }
localSessionInfo = new SessionInfo(localConnectionInfo, 1); localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo); localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) { if (configuration.isDuplex()) {
// separate inbound chanel for forwards so we don't contend with outbound dispatch on same connection // separate in-bound chamnel for forwards so we don't
// contend with out-bound dispatch on same connection
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName()); duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
+ configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName()); duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword()); duplexLocalConnectionInfo.setPassword(configuration.getPassword());
@ -308,7 +310,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// sync requests that may fail // sync requests that may fail
resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
if (resp instanceof ExceptionResponse) { if (resp instanceof ExceptionResponse) {
throw ((ExceptionResponse)resp).getException(); throw ((ExceptionResponse) resp).getException();
} }
SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
@ -323,7 +325,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
} else { } else {
LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
} }
startedLatch.countDown(); startedLatch.countDown();
localStartedLatch.countDown(); localStartedLatch.countDown();
@ -334,7 +336,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!disposed.get()) { if (!disposed.get()) {
setupStaticDestinations(); setupStaticDestinations();
} else { } else {
LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName
+ ") was interrupted during establishment.");
} }
} }
} }
@ -374,12 +377,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
producerInfo = new ProducerInfo(remoteSessionInfo, 1); producerInfo = new ProducerInfo(remoteSessionInfo, 1);
producerInfo.setResponseRequired(false); producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo); remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to // Listen to consumer advisory messages on the remote broker to determine demand.
// determine demand.
if (!configuration.isStaticBridge()) { if (!configuration.isStaticBridge()) {
demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
// always dispatch advisory message asynchronously so that we never block the producer // always dispatch advisory message asynchronously so that
// broker if we are slow // we never block the producer broker if we are slow
demandConsumerInfo.setDispatchAsync(true); demandConsumerInfo.setDispatchAsync(true);
String advisoryTopic = configuration.getDestinationFilter(); String advisoryTopic = configuration.getDestinationFilter();
if (configuration.isBridgeTempDestinations()) { if (configuration.isBridgeTempDestinations()) {
@ -495,10 +497,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
} }
if (configuration.getDynamicallyIncludedDestinations() != null) { if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
.toArray( new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
.size()]);
} }
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error mapping remote destinations", t); LOG.error("Error mapping remote destinations", t);
@ -514,7 +514,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else { } else {
if (isDuplex()) { if (isDuplex()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " duplex command type: "+ command.getCommandId()); LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getCommandId());
} }
if (command.isMessage()) { if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command; final ActiveMQMessage message = (ActiveMQMessage) command;
@ -526,11 +526,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!isPermissableDestination(message.getDestination(), true)) { if (!isPermissableDestination(message.getDestination(), true)) {
return; return;
} }
// message being forwarded - we need to propagate the response to our local send // message being forwarded - we need to
// propagate the response to our local send
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
final int correlationId = message.getCommandId(); final int correlationId = message.getCommandId();
@Override @Override
public void onCompletion(FutureResponse resp) { public void onCompletion(FutureResponse resp) {
try { try {
@ -641,10 +643,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (path != null && path.length >= networkTTL) { if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
+ " network hops only : " + info);
} }
return; return;
} }
if (contains(path, localBrokerPath[0])) { if (contains(path, localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker. // Ignore this consumer as it's a consumer we locally sent to the broker.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -652,10 +656,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
return; return;
} }
if (!isPermissableDestination(info.getDestination())) { if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list // ignore if not in the permitted or in the excluded list
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination()
+ " is not permiited :" + info);
} }
return; return;
} }
@ -669,7 +675,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ " as already subscribed to matching destination : " + info);
} }
} }
} }
@ -698,10 +705,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo); LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker
+ " from " + remoteBrokerName + ", destination: " + destInfo);
} }
if (destInfo.isRemoveOperation()) { if (destInfo.isRemoveOperation()) {
// Serialize with removeSub operations such that all removeSub advisories are generated // Serialize with removeSub operations such that all removeSub advisories
// are generated
serialExecutor.execute(new Runnable() { serialExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -729,8 +738,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
if (!disposed.get()) { if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary() ) { if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with pending sends as the demand sub may outlive the remote dest // not a reason to terminate the bridge - temps can disappear with
// pending sends as the demand sub may outlive the remote dest
if (messageDispatch != null) { if (messageDispatch != null) {
LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error); LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
try { try {
@ -771,9 +781,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
advisoryBroker.fireAdvisory(context, advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), advisoryMessage);
messageDispatch.getMessage(), null, advisoryMessage);
} }
} catch (Exception e) { } catch (Exception e) {
@ -798,9 +807,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeSubscription(final DemandSubscription sub) throws IOException { protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) { if (sub != null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " remove local subscription:" LOG.trace(configuration.getBrokerName() + " remove local subscription:" + sub.getLocalInfo().getConsumerId() + " for remote "
+ sub.getLocalInfo().getConsumerId() + sub.getRemoteInfo().getConsumerId());
+ " for remote " + sub.getRemoteInfo().getConsumerId());
} }
// ensure not available for conduit subs pending removal // ensure not available for conduit subs pending removal
@ -808,8 +816,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses // continue removal in separate thread to free up this thread for outstanding responses
// Serialize with removeDestination operations so that removeSubs are serialised with removeDestinations // Serialize with removeDestination operations so that removeSubs are serialized with
// such that all removeSub advisories are generated // removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() { serialExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -852,7 +860,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (suppressMessageDispatch(md, sub)) { if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
+ " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+ ", message: " + md.getMessage());
} }
// still ack as it may be durable // still ack as it may be durable
try { try {
@ -865,14 +875,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Message message = configureMessage(md); Message message = configureMessage(md);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") "
+ (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination "
+ message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
} }
if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) { if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
// If the message was originally sent using async // If the message was originally sent using async send, we will
// send, we will preserve that QOS // preserve that QOS by bridging it using an async send (small chance
// by bridging it using an async send (small chance
// of message loss). // of message loss).
try { try {
remoteBroker.oneway(message); remoteBroker.oneway(message);
@ -884,9 +895,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else { } else {
// The message was not sent using async send, so we // The message was not sent using async send, so we should only
// should only ack the local // ack the local broker when we get confirmation that the remote
// broker when we get confirmation that the remote
// broker has received the message. // broker has received the message.
ResponseCallback callback = new ResponseCallback() { ResponseCallback callback = new ResponseCallback() {
@Override @Override
@ -909,11 +919,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}; };
remoteBroker.asyncRequest(message, callback); remoteBroker.asyncRequest(message, callback);
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: "
+ md.getMessage());
} }
} }
} else if (command.isBrokerInfo()) { } else if (command.isBrokerInfo()) {
@ -940,10 +950,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
synchronized (brokerInfoMutex) {
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:"
+ remoteBrokerId);
}
safeWaitUntilStarted();
ServiceSupport.dispose(this);
}
}
}
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:"
+ remoteBrokerId);
}
ServiceSupport.dispose(this);
}
}
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
}
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
boolean suppress = false; boolean suppress = false;
// for durable subs, suppression via filter leaves dangling acks so we need to // for durable subs, suppression via filter leaves dangling acks so we
// check here and allow the ack irrespective // need to check here and allow the ack irrespective
if (sub.getLocalInfo().isDurable()) { if (sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage()); messageEvalContext.setMessageReference(md.getMessage());
@ -953,96 +999,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return suppress; return suppress;
} }
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
/**
* @param dynamicallyIncludedDestinations The
* dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public ActiveMQDestination[] getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public ActiveMQDestination[] getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations
* to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
/**
* @return Returns the durableDestinations.
*/
public ActiveMQDestination[] getDurableDestinations() {
return durableDestinations;
}
/**
* @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the localBroker.
*/
public Transport getLocalBroker() {
return localBroker;
}
/**
* @return Returns the remoteBroker.
*/
public Transport getRemoteBroker() {
return remoteBroker;
}
/**
* @return the createdByDuplex
*/
public boolean isCreatedByDuplex() {
return this.createdByDuplex;
}
/**
* @param createdByDuplex the createdByDuplex to set
*/
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if (brokerPath != null) { if (brokerPath != null) {
for (int i = 0; i < brokerPath.length; i++) { for (BrokerId id : brokerPath) {
if (brokerId.equals(brokerPath[i])) { if (brokerId.equals(id)) {
return true; return true;
} }
} }
@ -1086,10 +1046,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ActiveMQDestination[] dests = staticallyIncludedDestinations; ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
for (int i = 0; i < dests.length; i++) { for (ActiveMQDestination dest : dests) {
ActiveMQDestination match = dests[i]; DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
return true; return true;
} }
} }
@ -1097,10 +1056,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
dests = excludedDestinations; dests = excludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
for (int i = 0; i < dests.length; i++) { for (ActiveMQDestination dest : dests) {
ActiveMQDestination match = dests[i]; DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match); if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
return false; return false;
} }
} }
@ -1108,10 +1066,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
dests = dynamicallyIncludedDestinations; dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
for (int i = 0; i < dests.length; i++) { for (ActiveMQDestination dest : dests) {
ActiveMQDestination match = dests[i]; DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
return true; return true;
} }
} }
@ -1127,8 +1084,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void setupStaticDestinations() { protected void setupStaticDestinations() {
ActiveMQDestination[] dests = staticallyIncludedDestinations; ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null) { if (dests != null) {
for (int i = 0; i < dests.length; i++) { for (ActiveMQDestination dest : dests) {
ActiveMQDestination dest = dests[i];
DemandSubscription sub = createDemandSubscription(dest); DemandSubscription sub = createDemandSubscription(dest);
try { try {
addSubscription(sub); addSubscription(sub);
@ -1164,21 +1120,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
/* /*
* check our existing subs networkConsumerIds against the list of network ids in this subscription * check our existing subs networkConsumerIds against the list of network
* A match means a duplicate which we suppress for topics and maybe for queues * ids in this subscription A match means a duplicate which we suppress for
* topics and maybe for queues
*/ */
private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false; boolean suppress = false;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { && !configuration.isSuppressDuplicateTopicSubscriptions()) {
return suppress; return suppress;
} }
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
Collection<Subscription> currentSubs = Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
getRegionSubscriptions(consumerInfo.getDestination());
for (Subscription sub : currentSubs) { for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) { if (!networkConsumers.isEmpty()) {
@ -1196,7 +1152,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
private boolean isInActiveDurableSub(Subscription sub) { private boolean isInActiveDurableSub(Subscription sub) {
return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive()); return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
} }
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
@ -1204,9 +1160,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo
+ ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: "
+ existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); + existingSub.getConsumerInfo().getNetworkConsumerIds());
} }
suppress = true; suppress = true;
} else { } else {
@ -1215,9 +1171,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
removeDuplicateSubscription(existingSub); removeDuplicateSubscription(existingSub);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from "
+ " with sub from " + remoteBrokerName + remoteBrokerName + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ candidateInfo.getNetworkConsumerIds()); + candidateInfo.getNetworkConsumerIds());
} }
} catch (IOException e) { } catch (IOException e) {
@ -1252,26 +1207,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Collection<Subscription> subs; Collection<Subscription> subs;
region = null; region = null;
switch ( dest.getDestinationType() ) switch (dest.getDestinationType()) {
{
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
region = region_broker.getQueueRegion(); region = region_broker.getQueueRegion();
break; break;
case ActiveMQDestination.TOPIC_TYPE: case ActiveMQDestination.TOPIC_TYPE:
region = region_broker.getTopicRegion(); region = region_broker.getTopicRegion();
break; break;
case ActiveMQDestination.TEMP_QUEUE_TYPE: case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = region_broker.getTempQueueRegion(); region = region_broker.getTempQueueRegion();
break; break;
case ActiveMQDestination.TEMP_TOPIC_TYPE: case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = region_broker.getTempTopicRegion(); region = region_broker.getTempTopicRegion();
break; break;
} }
if ( region instanceof AbstractRegion ) { if (region instanceof AbstractRegion) {
subs = ((AbstractRegion) region).getSubscriptions().values(); subs = ((AbstractRegion) region).getSubscriptions().values();
} else { } else {
subs = null; subs = null;
@ -1281,7 +1232,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
//add our original id to ourselves // add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId()); info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info); return doCreateDemandSubscription(info);
} }
@ -1318,8 +1269,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// Indicate that this subscription is being made on behalf of the remote broker. // Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[] { remoteBrokerId }); info.setBrokerPath(new BrokerId[] { remoteBrokerId });
// the remote info held by the DemandSubscription holds the original consumerId, // the remote info held by the DemandSubscription holds the original
// the local info get's overwritten // consumerId, the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
DemandSubscription result = null; DemandSubscription result = null;
try { try {
@ -1331,7 +1282,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())){ if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
sub.getLocalInfo().setDispatchAsync(true); sub.getLocalInfo().setDispatchAsync(true);
} else { } else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
@ -1354,7 +1305,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeDemandSubscription(ConsumerId id) throws IOException { protected void removeDemandSubscription(ConsumerId id) throws IOException {
DemandSubscription sub = subscriptionMapByRemoteId.remove(id); DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id
+ ", matching sub: " + sub);
} }
if (sub != null) { if (sub != null) {
removeSubscription(sub); removeSubscription(sub);
@ -1379,8 +1331,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
/** /**
* Performs a timed wait on the started latch and then checks for disposed before performing * Performs a timed wait on the started latch and then checks for disposed
* another wait each time the the started wait times out. * before performing another wait each time the the started wait times out.
* *
* @throws InterruptedException * @throws InterruptedException
*/ */
@ -1403,44 +1355,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL()); return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
} }
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
synchronized (brokerInfoMutex) {
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
}
safeWaitUntilStarted();
ServiceSupport.dispose(this);
}
}
}
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
} }
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
synchronized (brokerInfoMutex) {
BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
}
ServiceSupport.dispose(this);
}
}
if (!disposed.get()) {
triggerLocalStartBridge();
}
}
}
protected BrokerId[] getRemoteBrokerPath() { protected BrokerId[] getRemoteBrokerPath() {
return remoteBrokerPath; return remoteBrokerPath;
} }
@ -1457,6 +1375,95 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
/**
* @param dynamicallyIncludedDestinations
* The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public ActiveMQDestination[] getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations
* The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public ActiveMQDestination[] getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations
* The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
/**
* @return Returns the durableDestinations.
*/
public ActiveMQDestination[] getDurableDestinations() {
return durableDestinations;
}
/**
* @param durableDestinations
* The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the localBroker.
*/
public Transport getLocalBroker() {
return localBroker;
}
/**
* @return Returns the remoteBroker.
*/
public Transport getRemoteBroker() {
return remoteBroker;
}
/**
* @return the createdByDuplex
*/
public boolean isCreatedByDuplex() {
return this.createdByDuplex;
}
/**
* @param createdByDuplex
* the createdByDuplex to set
*/
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
@Override @Override
public String getRemoteAddress() { public String getRemoteAddress() {
return remoteBroker.getRemoteAddress(); return remoteBroker.getRemoteAddress();