diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 06c8131ed3..abfad97ba7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -127,7 +127,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected NetworkBridgeConfiguration configuration; protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); - protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; + protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; protected Object brokerInfoMutex = new Object(); protected BrokerId remoteBrokerId; @@ -224,7 +224,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.warn("Caught exception from remote start", e); } } else { - LOG.warn ("Bridge was disposed before the start() method was fully executed."); + LOG.warn("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); } } @@ -288,16 +288,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // sync requests that may fail Object resp = localBroker.request(localConnectionInfo); if (resp instanceof ExceptionResponse) { - throw ((ExceptionResponse)resp).getException(); + throw ((ExceptionResponse) resp).getException(); } localSessionInfo = new SessionInfo(localConnectionInfo, 1); localBroker.oneway(localSessionInfo); if (configuration.isDuplex()) { - // separate inbound chanel for forwards so we don't contend with outbound dispatch on same connection + // separate in-bound chamnel for forwards so we don't + // contend with out-bound dispatch on same connection ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName()); + duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" + + configuration.getBrokerName()); duplexLocalConnectionInfo.setUserName(configuration.getUserName()); duplexLocalConnectionInfo.setPassword(configuration.getPassword()); @@ -308,7 +310,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // sync requests that may fail resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); if (resp instanceof ExceptionResponse) { - throw ((ExceptionResponse)resp).getException(); + throw ((ExceptionResponse) resp).getException(); } SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); @@ -323,7 +325,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); } else { - LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); + LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); } startedLatch.countDown(); localStartedLatch.countDown(); @@ -334,7 +336,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (!disposed.get()) { setupStaticDestinations(); } else { - LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); + LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + + ") was interrupted during establishment."); } } } @@ -374,12 +377,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br producerInfo = new ProducerInfo(remoteSessionInfo, 1); producerInfo.setResponseRequired(false); remoteBroker.oneway(producerInfo); - // Listen to consumer advisory messages on the remote broker to - // determine demand. + // Listen to consumer advisory messages on the remote broker to determine demand. if (!configuration.isStaticBridge()) { demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); - // always dispatch advisory message asynchronously so that we never block the producer - // broker if we are slow + // always dispatch advisory message asynchronously so that + // we never block the producer broker if we are slow demandConsumerInfo.setDispatchAsync(true); String advisoryTopic = configuration.getDestinationFilter(); if (configuration.isBridgeTempDestinations()) { @@ -488,17 +490,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br IntrospectionSupport.getProperties(configuration, props, null); if (configuration.getExcludedDestinations() != null) { excludedDestinations = configuration.getExcludedDestinations().toArray( - new ActiveMQDestination[configuration.getExcludedDestinations().size()]); + new ActiveMQDestination[configuration.getExcludedDestinations().size()]); } if (configuration.getStaticallyIncludedDestinations() != null) { staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( - new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); + new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); } if (configuration.getDynamicallyIncludedDestinations() != null) { - dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations() - .toArray( - new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations() - .size()]); + dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); } } catch (Throwable t) { LOG.error("Error mapping remote destinations", t); @@ -514,7 +514,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { if (isDuplex()) { if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " duplex command type: "+ command.getCommandId()); + LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getCommandId()); } if (command.isMessage()) { final ActiveMQMessage message = (ActiveMQMessage) command; @@ -526,11 +526,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (!isPermissableDestination(message.getDestination(), true)) { return; } - // message being forwarded - we need to propagate the response to our local send + // message being forwarded - we need to + // propagate the response to our local send message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { final int correlationId = message.getCommandId(); + @Override public void onCompletion(FutureResponse resp) { try { @@ -641,10 +643,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (path != null && path.length >= networkTTL) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + + " network hops only : " + info); } return; } + if (contains(path, localBrokerPath[0])) { // Ignore this consumer as it's a consumer we locally sent to the broker. if (LOG.isDebugEnabled()) { @@ -652,10 +656,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } return; } + if (!isPermissableDestination(info.getDestination())) { // ignore if not in the permitted or in the excluded list if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + + " is not permiited :" + info); } return; } @@ -669,7 +675,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } else { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + + " as already subscribed to matching destination : " + info); } } } @@ -698,10 +705,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo); + LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + + " from " + remoteBrokerName + ", destination: " + destInfo); } if (destInfo.isRemoveOperation()) { - // Serialize with removeSub operations such that all removeSub advisories are generated + // Serialize with removeSub operations such that all removeSub advisories + // are generated serialExecutor.execute(new Runnable() { @Override public void run() { @@ -729,10 +738,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { if (!disposed.get()) { - if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary() ) { - // not a reason to terminate the bridge - temps can disappear with pending sends as the demand sub may outlive the remote dest + if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { + // not a reason to terminate the bridge - temps can disappear with + // pending sends as the demand sub may outlive the remote dest if (messageDispatch != null) { - LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error); + LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error); try { MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); poisonAck.setPoisonCause(error); @@ -742,7 +752,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } fireFailedForwardAdvisory(messageDispatch, error); } else { - LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error); + LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error); } return; } @@ -771,9 +781,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); - advisoryBroker.fireAdvisory(context, - AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), - messageDispatch.getMessage(), null, advisoryMessage); + advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, + advisoryMessage); } } catch (Exception e) { @@ -798,9 +807,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void removeSubscription(final DemandSubscription sub) throws IOException { if (sub != null) { if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " remove local subscription:" - + sub.getLocalInfo().getConsumerId() - + " for remote " + sub.getRemoteInfo().getConsumerId()); + LOG.trace(configuration.getBrokerName() + " remove local subscription:" + sub.getLocalInfo().getConsumerId() + " for remote " + + sub.getRemoteInfo().getConsumerId()); } // ensure not available for conduit subs pending removal @@ -808,8 +816,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); // continue removal in separate thread to free up this thread for outstanding responses - // Serialize with removeDestination operations so that removeSubs are serialised with removeDestinations - // such that all removeSub advisories are generated + // Serialize with removeDestination operations so that removeSubs are serialized with + // removeDestinations such that all removeSub advisories are generated serialExecutor.execute(new Runnable() { @Override public void run() { @@ -852,7 +860,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (suppressMessageDispatch(md, sub)) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); + LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + + ", message: " + md.getMessage()); } // still ack as it may be durable try { @@ -865,14 +875,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br Message message = configureMessage(md); if (LOG.isDebugEnabled()) { - LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); + LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); } if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) { - // If the message was originally sent using async - // send, we will preserve that QOS - // by bridging it using an async send (small chance + // If the message was originally sent using async send, we will + // preserve that QOS by bridging it using an async send (small chance // of message loss). try { remoteBroker.oneway(message); @@ -884,9 +895,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { - // The message was not sent using async send, so we - // should only ack the local - // broker when we get confirmation that the remote + // The message was not sent using async send, so we should only + // ack the local broker when we get confirmation that the remote // broker has received the message. ResponseCallback callback = new ResponseCallback() { @Override @@ -909,11 +919,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }; remoteBroker.asyncRequest(message, callback); - } } else { if (LOG.isDebugEnabled()) { - LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); + LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + + md.getMessage()); } } } else if (command.isBrokerInfo()) { @@ -927,10 +937,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceLocalException(ce.getException()); } else { switch (command.getDataStructureType()) { - case WireFormatInfo.DATA_STRUCTURE_TYPE: - break; - default: - LOG.warn("Unexpected local command: " + command); + case WireFormatInfo.DATA_STRUCTURE_TYPE: + break; + default: + LOG.warn("Unexpected local command: " + command); } } } catch (Throwable e) { @@ -940,10 +950,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { + synchronized (brokerInfoMutex) { + if (remoteBrokerId != null) { + if (remoteBrokerId.equals(localBrokerId)) { + if (LOG.isTraceEnabled()) { + LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + + remoteBrokerId); + } + safeWaitUntilStarted(); + ServiceSupport.dispose(this); + } + } + } + } + + protected void serviceRemoteBrokerInfo(Command command) throws IOException { + synchronized (brokerInfoMutex) { + BrokerInfo remoteBrokerInfo = (BrokerInfo) command; + remoteBrokerId = remoteBrokerInfo.getBrokerId(); + remoteBrokerPath[0] = remoteBrokerId; + remoteBrokerName = remoteBrokerInfo.getBrokerName(); + if (localBrokerId != null) { + if (localBrokerId.equals(remoteBrokerId)) { + if (LOG.isTraceEnabled()) { + LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + + remoteBrokerId); + } + ServiceSupport.dispose(this); + } + } + if (!disposed.get()) { + triggerLocalStartBridge(); + } + } + } + private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { boolean suppress = false; - // for durable subs, suppression via filter leaves dangling acks so we need to - // check here and allow the ack irrespective + // for durable subs, suppression via filter leaves dangling acks so we + // need to check here and allow the ack irrespective if (sub.getLocalInfo().isDurable()) { MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); messageEvalContext.setMessageReference(md.getMessage()); @@ -953,96 +999,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } - /** - * @return Returns the dynamicallyIncludedDestinations. - */ - public ActiveMQDestination[] getDynamicallyIncludedDestinations() { - return dynamicallyIncludedDestinations; - } - - /** - * @param dynamicallyIncludedDestinations The - * dynamicallyIncludedDestinations to set. - */ - public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { - this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; - } - - /** - * @return Returns the excludedDestinations. - */ - public ActiveMQDestination[] getExcludedDestinations() { - return excludedDestinations; - } - - /** - * @param excludedDestinations The excludedDestinations to set. - */ - public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { - this.excludedDestinations = excludedDestinations; - } - - /** - * @return Returns the staticallyIncludedDestinations. - */ - public ActiveMQDestination[] getStaticallyIncludedDestinations() { - return staticallyIncludedDestinations; - } - - /** - * @param staticallyIncludedDestinations The staticallyIncludedDestinations - * to set. - */ - public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { - this.staticallyIncludedDestinations = staticallyIncludedDestinations; - } - - /** - * @return Returns the durableDestinations. - */ - public ActiveMQDestination[] getDurableDestinations() { - return durableDestinations; - } - - /** - * @param durableDestinations The durableDestinations to set. - */ - public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { - this.durableDestinations = durableDestinations; - } - - /** - * @return Returns the localBroker. - */ - public Transport getLocalBroker() { - return localBroker; - } - - /** - * @return Returns the remoteBroker. - */ - public Transport getRemoteBroker() { - return remoteBroker; - } - - /** - * @return the createdByDuplex - */ - public boolean isCreatedByDuplex() { - return this.createdByDuplex; - } - - /** - * @param createdByDuplex the createdByDuplex to set - */ - public void setCreatedByDuplex(boolean createdByDuplex) { - this.createdByDuplex = createdByDuplex; - } - public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { if (brokerPath != null) { - for (int i = 0; i < brokerPath.length; i++) { - if (brokerId.equals(brokerPath[i])) { + for (BrokerId id : brokerPath) { + if (brokerId.equals(id)) { return true; } } @@ -1086,10 +1046,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ActiveMQDestination[] dests = staticallyIncludedDestinations; if (dests != null && dests.length > 0) { - for (int i = 0; i < dests.length; i++) { - ActiveMQDestination match = dests[i]; - DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); - if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { return true; } } @@ -1097,10 +1056,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br dests = excludedDestinations; if (dests != null && dests.length > 0) { - for (int i = 0; i < dests.length; i++) { - ActiveMQDestination match = dests[i]; - DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match); - if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { + for (ActiveMQDestination dest : dests) { + DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { return false; } } @@ -1108,10 +1066,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br dests = dynamicallyIncludedDestinations; if (dests != null && dests.length > 0) { - for (int i = 0; i < dests.length; i++) { - ActiveMQDestination match = dests[i]; - DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match); - if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) { + for (ActiveMQDestination dest : dests) { + DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); + if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { return true; } } @@ -1127,8 +1084,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void setupStaticDestinations() { ActiveMQDestination[] dests = staticallyIncludedDestinations; if (dests != null) { - for (int i = 0; i < dests.length; i++) { - ActiveMQDestination dest = dests[i]; + for (ActiveMQDestination dest : dests) { DemandSubscription sub = createDemandSubscription(dest); try { addSubscription(sub); @@ -1164,21 +1120,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /* - * check our existing subs networkConsumerIds against the list of network ids in this subscription - * A match means a duplicate which we suppress for topics and maybe for queues + * check our existing subs networkConsumerIds against the list of network + * ids in this subscription A match means a duplicate which we suppress for + * topics and maybe for queues */ private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); boolean suppress = false; - if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || - consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { + if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() + && !configuration.isSuppressDuplicateTopicSubscriptions()) { return suppress; } List candidateConsumers = consumerInfo.getNetworkConsumerIds(); - Collection currentSubs = - getRegionSubscriptions(consumerInfo.getDestination()); + Collection currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); for (Subscription sub : currentSubs) { List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); if (!networkConsumers.isEmpty()) { @@ -1196,7 +1152,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } private boolean isInActiveDurableSub(Subscription sub) { - return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive()); + return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); } private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { @@ -1204,9 +1160,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName - + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " - + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); + LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo + + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: " + + existingSub.getConsumerInfo().getNetworkConsumerIds()); } suppress = true; } else { @@ -1215,10 +1171,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br removeDuplicateSubscription(existingSub); if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() - + " with sub from " + remoteBrokerName - + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " - + candidateInfo.getNetworkConsumerIds()); + LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from " + + remoteBrokerName + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " + + candidateInfo.getNetworkConsumerIds()); } } catch (IOException e) { LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); @@ -1252,26 +1207,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br Collection subs; region = null; - switch ( dest.getDestinationType() ) - { + switch (dest.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: region = region_broker.getQueueRegion(); break; - case ActiveMQDestination.TOPIC_TYPE: region = region_broker.getTopicRegion(); break; - case ActiveMQDestination.TEMP_QUEUE_TYPE: region = region_broker.getTempQueueRegion(); break; - case ActiveMQDestination.TEMP_TOPIC_TYPE: region = region_broker.getTempTopicRegion(); break; } - if ( region instanceof AbstractRegion ) { + if (region instanceof AbstractRegion) { subs = ((AbstractRegion) region).getSubscriptions().values(); } else { subs = null; @@ -1281,7 +1232,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { - //add our original id to ourselves + // add our original id to ourselves info.addNetworkConsumerId(info.getConsumerId()); return doCreateDemandSubscription(info); } @@ -1318,8 +1269,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // Indicate that this subscription is being made on behalf of the remote broker. info.setBrokerPath(new BrokerId[] { remoteBrokerId }); - // the remote info held by the DemandSubscription holds the original consumerId, - // the local info get's overwritten + // the remote info held by the DemandSubscription holds the original + // consumerId, the local info get's overwritten info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); DemandSubscription result = null; try { @@ -1331,7 +1282,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { - if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())){ + if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { sub.getLocalInfo().setDispatchAsync(true); } else { sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); @@ -1345,7 +1296,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // This works for now since we use a VM connection to the local broker. // may need to change if we ever subscribe to a remote broker. sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); - } else { + } else { // need to ack this message if it is ignored as it is durable so // we check before we send. see: suppressMessageDispatch() } @@ -1354,7 +1305,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void removeDemandSubscription(ConsumerId id) throws IOException { DemandSubscription sub = subscriptionMapByRemoteId.remove(id); if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub); + LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + + ", matching sub: " + sub); } if (sub != null) { removeSubscription(sub); @@ -1379,8 +1331,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } /** - * Performs a timed wait on the started latch and then checks for disposed before performing - * another wait each time the the started wait times out. + * Performs a timed wait on the started latch and then checks for disposed + * before performing another wait each time the the started wait times out. * * @throws InterruptedException */ @@ -1403,45 +1355,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL()); } - protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { - synchronized (brokerInfoMutex) { - if (remoteBrokerId != null) { - if (remoteBrokerId.equals(localBrokerId)) { - if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId); - } - safeWaitUntilStarted(); - ServiceSupport.dispose(this); - } - } - } - } - protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); } - protected void serviceRemoteBrokerInfo(Command command) throws IOException { - synchronized (brokerInfoMutex) { - BrokerInfo remoteBrokerInfo = (BrokerInfo)command; - remoteBrokerId = remoteBrokerInfo.getBrokerId(); - remoteBrokerPath[0] = remoteBrokerId; - remoteBrokerName = remoteBrokerInfo.getBrokerName(); - if (localBrokerId != null) { - if (localBrokerId.equals(remoteBrokerId)) { - if (LOG.isTraceEnabled()) { - LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId); - } - ServiceSupport.dispose(this); - } - } - if (!disposed.get()) { - triggerLocalStartBridge(); - } - } - } - - protected BrokerId[] getRemoteBrokerPath() { + protected BrokerId[] getRemoteBrokerPath() { return remoteBrokerPath; } @@ -1457,6 +1375,95 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + /** + * @return Returns the dynamicallyIncludedDestinations. + */ + public ActiveMQDestination[] getDynamicallyIncludedDestinations() { + return dynamicallyIncludedDestinations; + } + + /** + * @param dynamicallyIncludedDestinations + * The dynamicallyIncludedDestinations to set. + */ + public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { + this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; + } + + /** + * @return Returns the excludedDestinations. + */ + public ActiveMQDestination[] getExcludedDestinations() { + return excludedDestinations; + } + + /** + * @param excludedDestinations + * The excludedDestinations to set. + */ + public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { + this.excludedDestinations = excludedDestinations; + } + + /** + * @return Returns the staticallyIncludedDestinations. + */ + public ActiveMQDestination[] getStaticallyIncludedDestinations() { + return staticallyIncludedDestinations; + } + + /** + * @param staticallyIncludedDestinations + * The staticallyIncludedDestinations to set. + */ + public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { + this.staticallyIncludedDestinations = staticallyIncludedDestinations; + } + + /** + * @return Returns the durableDestinations. + */ + public ActiveMQDestination[] getDurableDestinations() { + return durableDestinations; + } + + /** + * @param durableDestinations + * The durableDestinations to set. + */ + public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { + this.durableDestinations = durableDestinations; + } + + /** + * @return Returns the localBroker. + */ + public Transport getLocalBroker() { + return localBroker; + } + + /** + * @return Returns the remoteBroker. + */ + public Transport getRemoteBroker() { + return remoteBroker; + } + + /** + * @return the createdByDuplex + */ + public boolean isCreatedByDuplex() { + return this.createdByDuplex; + } + + /** + * @param createdByDuplex + * the createdByDuplex to set + */ + public void setCreatedByDuplex(boolean createdByDuplex) { + this.createdByDuplex = createdByDuplex; + } + @Override public String getRemoteAddress() { return remoteBroker.getRemoteAddress();