diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index c1779c2495..9e7a322887 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -89,7 +89,7 @@ import org.apache.commons.logging.LogFactory; * @version $Revision$ */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { - + private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class); private static final ThreadPoolExecutor ASYNC_TASKS; protected final Transport localBroker; @@ -114,7 +114,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap(); protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); - protected final BrokerId localBrokerPath[] = new BrokerId[] {null}; + protected final BrokerId localBrokerPath[] = new BrokerId[] { null }; protected CountDownLatch startedLatch = new CountDownLatch(2); protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); @@ -125,12 +125,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final AtomicLong enqueueCounter = new AtomicLong(); final AtomicLong dequeueCounter = new AtomicLong(); - + private NetworkBridgeListener networkBridgeListener; private boolean createdByDuplex; private BrokerInfo localBrokerInfo; private BrokerInfo remoteBrokerInfo; - private AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; @@ -155,7 +154,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object o) { - Command command = (Command)o; + Command command = (Command) o; serviceLocalCommand(command); } @@ -166,7 +165,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br remoteBroker.setTransportListener(new TransportListener() { public void onCommand(Object o) { - Command command = (Command)o; + Command command = (Command) o; serviceRemoteCommand(command); } @@ -290,10 +289,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localConnectionInfo.setPassword(configuration.getPassword()); Transport originalTransport = remoteBroker; while (originalTransport instanceof TransportFilter) { - originalTransport = ((TransportFilter)originalTransport).getNext(); + originalTransport = ((TransportFilter) originalTransport).getNext(); } if (originalTransport instanceof SslTransport) { - X509Certificate[] peerCerts = ((SslTransport)originalTransport).getPeerCertificates(); + X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); localConnectionInfo.setTransportContext(peerCerts); } localBroker.oneway(localConnectionInfo); @@ -385,10 +384,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } finally { sendShutdown.countDown(); } - + } }); - if( !sendShutdown.await(10, TimeUnit.SECONDS) ) { + if (!sendShutdown.await(10, TimeUnit.SECONDS)) { LOG.info("Network Could not shutdown in a timely manner"); } } finally { @@ -424,12 +423,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - protected void serviceRemoteCommand(Command command) { + protected void serviceRemoteCommand(Command command) { if (!disposed.get()) { try { if (command.isMessageDispatch()) { waitStarted(); - MessageDispatch md = (MessageDispatch)command; + MessageDispatch md = (MessageDispatch) command; serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); demandConsumerDispatched++; if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { @@ -438,7 +437,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } else if (command.isBrokerInfo()) { lastConnectSucceeded.set(true); - remoteBrokerInfo = (BrokerInfo)command; + remoteBrokerInfo = (BrokerInfo) command; Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); try { IntrospectionSupport.getProperties(configuration, props, null); @@ -463,18 +462,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // Let the local broker know the remote broker's ID. localBroker.oneway(command); } else if (command.getClass() == ConnectionError.class) { - ConnectionError ce = (ConnectionError)command; + ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); } else { if (isDuplex()) { if (command.isMessage()) { - ActiveMQMessage message = (ActiveMQMessage)command; + ActiveMQMessage message = (ActiveMQMessage) command; if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) { serviceRemoteConsumerAdvisory(message.getDataStructure()); } else { - if (!isPermissableDestination(message.getDestination(), true)) { - return; - } + if (!isPermissableDestination(message.getDestination(), true)) { + return; + } if (message.isResponseRequired()) { Response reply = new Response(); reply.setCorrelationId(message.getCommandId()); @@ -492,20 +491,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.oneway(command); break; case ConsumerInfo.DATA_STRUCTURE_TYPE: - localStartedLatch.await(); + localStartedLatch.await(); if (started.get()) { if (!addConsumerInfo((ConsumerInfo) command)) { if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring ConsumerInfo: "+ command); + LOG.debug("Ignoring ConsumerInfo: " + command); } } else { if (LOG.isTraceEnabled()) { - LOG.trace("Adding ConsumerInfo: "+ command); + LOG.trace("Adding ConsumerInfo: " + command); } } } else { // received a subscription whilst stopping - LOG.warn("Stopping - ignoring ConsumerInfo: "+ command); + LOG.warn("Stopping - ignoring ConsumerInfo: " + command); } break; default: @@ -538,9 +537,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final int networkTTL = configuration.getNetworkTTL(); if (data.getClass() == ConsumerInfo.class) { // Create a new local subscription - ConsumerInfo info = (ConsumerInfo)data; + ConsumerInfo info = (ConsumerInfo) data; BrokerId[] path = info.getBrokerPath(); - + if (path != null && path.length >= networkTTL) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); @@ -553,7 +552,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); } return; - } + } if (!isPermissableDestination(info.getDestination())) { // ignore if not in the permitted or in the excluded list if (LOG.isDebugEnabled()) { @@ -561,10 +560,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } return; } - + // in a cyclic network there can be multiple bridges per broker that can propagate // a network subscription so there is a need to synchronise on a shared entity - synchronized(brokerService.getVmConnectorURI()) { + synchronized (brokerService.getVmConnectorURI()) { if (addConsumerInfo(info)) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info); @@ -578,7 +577,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == DestinationInfo.class) { // It's a destination info - we want to pass up // information about temporary destinations - DestinationInfo destInfo = (DestinationInfo)data; + DestinationInfo destInfo = (DestinationInfo) data; BrokerId[] path = destInfo.getBrokerPath(); if (path != null && path.length >= networkTTL) { if (LOG.isDebugEnabled()) { @@ -597,7 +596,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br destInfo.setConnectionId(localConnectionInfo.getConnectionId()); if (destInfo.getDestination() instanceof ActiveMQTempDestination) { // re-set connection id so comes from here - ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destInfo.getDestination(); + ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); } destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); @@ -606,7 +605,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } localBroker.oneway(destInfo); } else if (data.getClass() == RemoveInfo.class) { - ConsumerId id = (ConsumerId)((RemoveInfo)data).getObjectId(); + ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); } } @@ -640,7 +639,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId()); } subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); - + // continue removal in separate thread to free up this thread for outstanding responses ASYNC_TASKS.execute(new Runnable() { public void run() { @@ -673,41 +672,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br try { if (command.isMessageDispatch()) { enqueueCounter.incrementAndGet(); - final MessageDispatch md = (MessageDispatch)command; + final MessageDispatch md = (MessageDispatch) command; final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); - if (sub != null && md.getMessage()!=null) { + if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { // See if this consumer's brokerPath tells us it came from the broker at the other end // of the bridge. I think we should be making this decision based on the message's // broker bread crumbs and not the consumer's? However, the message's broker bread // crumbs are null, which is another matter. boolean cameFromRemote = false; - Object consumerInfo = md.getMessage().getDataStructure(); - if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) ) - cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId()); - + Object consumerInfo = md.getMessage().getDataStructure(); + if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) + cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); + Message message = configureMessage(md); if (LOG.isDebugEnabled()) { LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); } - + if (!message.isResponseRequired()) { // If the message was originally sent using async // send, we will preserve that QOS // by bridging it using an async send (small chance // of message loss). - - // Don't send it off to the remote if it originally came from the remote. - if( !cameFromRemote ) { - remoteBroker.oneway(message); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Message not forwarded on to remote, because message came from remote"); + + try { + // Don't send it off to the remote if it originally came from the remote. + if (!cameFromRemote) { + remoteBroker.oneway(message); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Message not forwarded on to remote, because message came from remote"); + } } + + localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); + dequeueCounter.incrementAndGet(); + } finally { + sub.decrementOutstandingResponses(); } - localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); - + } else { // The message was not sent using async send, so we @@ -719,12 +723,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br try { Response response = future.getResult(); if (response.isException()) { - ExceptionResponse er = (ExceptionResponse)response; + ExceptionResponse er = (ExceptionResponse) response; serviceLocalException(er.getException()); } else { localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); dequeueCounter.incrementAndGet(); - + } } catch (IOException e) { serviceLocalException(e); @@ -735,16 +739,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }; remoteBroker.asyncRequest(message, callback); - sub.incrementOutstandingResponses(); } - + } else { if (LOG.isDebugEnabled()) { LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); } } } else if (command.isBrokerInfo()) { - localBrokerInfo = (BrokerInfo)command; + localBrokerInfo = (BrokerInfo) command; serviceLocalBrokerInfo(command); } else if (command.isShutdownInfo()) { LOG.info(configuration.getBrokerName() + " Shutting down"); @@ -757,7 +760,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br stop(); } } else if (command.getClass() == ConnectionError.class) { - ConnectionError ce = (ConnectionError)command; + ConnectionError ce = (ConnectionError) command; serviceLocalException(ce.getException()); } else { switch (command.getDataStructureType()) { @@ -768,7 +771,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } catch (Throwable e) { - LOG.warn("Caught an exception processing local command",e); + LOG.warn("Caught an exception processing local command", e); serviceLocalException(e); } } @@ -783,7 +786,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br /** * @param dynamicallyIncludedDestinations The - * dynamicallyIncludedDestinations to set. + * dynamicallyIncludedDestinations to set. */ public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; @@ -812,7 +815,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br /** * @param staticallyIncludedDestinations The staticallyIncludedDestinations - * to set. + * to set. */ public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { this.staticallyIncludedDestinations = staticallyIncludedDestinations; @@ -883,30 +886,30 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { if (brokerPath == null || brokerPath.length == 0) { - return new BrokerId[] {idToAppend}; + return new BrokerId[] { idToAppend }; } BrokerId rc[] = new BrokerId[brokerPath.length + 1]; System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); rc[brokerPath.length] = idToAppend; return rc; } - + protected boolean isPermissableDestination(ActiveMQDestination destination) { - return isPermissableDestination(destination, false); + return isPermissableDestination(destination, false); } protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { // Are we not bridging temp destinations? if (destination.isTemporary()) { - if (allowTemporary) { - return true; - } else { - return configuration.isBridgeTempDestinations(); - } - } + if (allowTemporary) { + return true; + } else { + return configuration.isBridgeTempDestinations(); + } + } final DestinationFilter filter = DestinationFilter.parseFilter(destination); - + ActiveMQDestination[] dests = excludedDestinations; if (dests != null && dests.length > 0) { for (int i = 0; i < dests.length; i++) { @@ -972,7 +975,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br addRemoteBrokerToBrokerPath(info); DemandSubscription sub = createDemandSubscription(info); if (sub != null) { - if (duplicateSuppressionIsRequired(sub) ) { + if (duplicateSuppressionIsRequired(sub)) { undoMapRegistration(sub); } else { addSubscription(sub); @@ -981,10 +984,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } return consumerAdded; } - + private void undoMapRegistration(DemandSubscription sub) { subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); - subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); + subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); } /* @@ -994,16 +997,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); boolean suppress = false; - + if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) { return suppress; } - - List candidateConsumers = consumerInfo.getNetworkConsumerIds(); + + List candidateConsumers = consumerInfo.getNetworkConsumerIds(); Collection currentSubs = getRegionSubscriptions(consumerInfo.getDestination().isTopic()); for (Subscription sub : currentSubs) { - List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); + List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); if (!networkConsumers.isEmpty()) { if (matchFound(candidateConsumers, networkConsumers)) { suppress = hasLowerPriority(sub, candidate.getLocalInfo()); @@ -1014,10 +1017,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } - private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { boolean suppress = false; - + if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName @@ -1029,7 +1031,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // remove the existing lower priority duplicate and allow this candidate try { removeDuplicateSubscription(existingSub); - + if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from " + remoteBrokerName @@ -1037,23 +1039,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br + candidateInfo.getNetworkConsumerIds()); } } catch (IOException e) { - LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e); + LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e); } } return suppress; } private void removeDuplicateSubscription(Subscription existingSub) throws IOException { - for (NetworkConnector connector: brokerService.getNetworkConnectors()) { + for (NetworkConnector connector : brokerService.getNetworkConnectors()) { if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { break; } - } + } } private boolean matchFound(List candidateConsumers, List networkConsumers) { boolean found = false; - for (ConsumerId aliasConsumer : networkConsumers) { + for (ConsumerId aliasConsumer : networkConsumers) { if (candidateConsumers.contains(aliasConsumer)) { found = true; break; @@ -1068,21 +1070,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br (isTopic ? region.getTopicRegion() : region.getQueueRegion()); return abstractRegion.getSubscriptions().values(); } - + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { //add our original id to ourselves info.addNetworkConsumerId(info.getConsumerId()); return doCreateDemandSubscription(info); } - protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { DemandSubscription result = new DemandSubscription(info); result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); if (info.getDestination().isTemporary()) { // reset the local connection Id - ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination(); + ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); } @@ -1090,7 +1091,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY; if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { // The longer the path to the consumer, the less it's consumer priority. - priority -= info.getBrokerPath().length + 1; + priority -= info.getBrokerPath().length + 1; } result.getLocalInfo().setPriority(priority); if (LOG.isDebugEnabled()) { @@ -1101,19 +1102,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return result; } - final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination){ + final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { ConsumerInfo info = new ConsumerInfo(); info.setDestination(destination); // the remote info held by the DemandSubscription holds the original // consumerId, // the local info get's overwritten - + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); DemandSubscription result = null; try { result = createDemandSubscription(info); } catch (IOException e) { - LOG.error("Failed to create DemandSubscription ",e); + LOG.error("Failed to create DemandSubscription ", e); } if (result != null) { result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY); @@ -1132,7 +1133,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); } - protected void removeDemandSubscription(ConsumerId id) throws IOException { DemandSubscription sub = subscriptionMapByRemoteId.remove(id); if (LOG.isDebugEnabled()) { @@ -1145,17 +1145,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } - + protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { boolean removeDone = false; DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); if (sub != null) { try { removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); - removeDone = true; + removeDone = true; } catch (IOException e) { LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e); - } + } } return removeDone; } @@ -1177,7 +1177,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; - protected abstract BrokerId[] getRemoteBrokerPath(); @@ -1215,17 +1214,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public long getEnqueueCounter() { return enqueueCounter.get(); } - + protected boolean isDuplex() { return configuration.isDuplex() || createdByDuplex; } - + public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } - + static { - ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "NetworkBridge"); thread.setDaemon(true); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index 0f53a885c4..b5ccf9dcf5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -33,10 +33,11 @@ import org.apache.commons.logging.LogFactory; */ public class DemandSubscription { private static final Log LOG = LogFactory.getLog(DemandSubscription.class); - + private final ConsumerInfo remoteInfo; private final ConsumerInfo localInfo; private Set remoteSubsIds = new CopyOnWriteArraySet(); + private AtomicInteger dispatched = new AtomicInteger(0); private AtomicBoolean activeWaiter = new AtomicBoolean(); @@ -44,7 +45,7 @@ public class DemandSubscription { remoteInfo = info; localInfo = info.copy(); localInfo.setNetworkSubscription(true); - remoteSubsIds.add(info.getConsumerId()); + remoteSubsIds.add(info.getConsumerId()); } /** @@ -81,7 +82,6 @@ public class DemandSubscription { return localInfo; } - /** * @return Returns the remoteInfo. */ @@ -111,13 +111,18 @@ public class DemandSubscription { public void decrementOutstandingResponses() { if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { - synchronized(activeWaiter) { + synchronized (activeWaiter) { activeWaiter.notifyAll(); } } } - public void incrementOutstandingResponses() { - dispatched.incrementAndGet(); + public boolean incrementOutstandingResponses() { + dispatched.incrementAndGet(); + if (activeWaiter.get()) { + decrementOutstandingResponses(); + return false; + } + return true; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java index d1e6958d38..4a593a7632 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java @@ -61,16 +61,23 @@ public class ResponseCorrelator extends TransportFilter { } public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException { - Command command = (Command)o; + Command command = (Command) o; command.setCommandId(sequenceGenerator.getNextSequenceId()); command.setResponseRequired(true); FutureResponse future = new FutureResponse(responseCallback); + IOException priorError = null; synchronized (requestMap) { - if( this.error !=null ) { - throw error; + priorError = this.error; + if (priorError == null) { + requestMap.put(new Integer(command.getCommandId()), future); } - requestMap.put(new Integer(command.getCommandId()), future); } + + if (priorError != null) { + future.set(new ExceptionResponse(priorError)); + throw priorError; + } + next.oneway(command); return future; }