From 25a252f34868fa58b3cb9207e8f91a44e720e8e7 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 3 Jul 2007 08:31:10 +0000 Subject: [PATCH] Added duplicate detection to the TransactionBroker - so can cope with rollbacks etc. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@552738 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/broker/Broker.java | 4 + .../apache/activemq/broker/BrokerFilter.java | 7 + .../apache/activemq/broker/BrokerService.java | 24 ++- .../apache/activemq/broker/EmptyBroker.java | 8 +- .../apache/activemq/broker/ErrorBroker.java | 9 +- .../activemq/broker/MutableBrokerFilter.java | 8 + .../activemq/broker/TransactionBroker.java | 54 ++++- .../activemq/broker/TransportConnection.java | 19 +- .../activemq/broker/TransportConnector.java | 1 + .../activemq/broker/ft/MasterBroker.java | 8 +- .../activemq/broker/ft/MasterConnector.java | 189 ++++++++---------- .../broker/region/PrefetchSubscription.java | 49 +++-- .../activemq/broker/region/RegionBroker.java | 38 ++-- .../apache/activemq/state/ProducerState.java | 7 +- .../activemq/transaction/Transaction.java | 4 + .../apache/activemq/broker/StubBroker.java | 8 + 16 files changed, 253 insertions(+), 184 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index b3e2675457..69d8c2370b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -250,4 +250,8 @@ public interface Broker extends Region, Service { * @return the URI that can be used to connect to the local Broker */ public URI getVmConnectorURI(); + + public void brokerServiceStarted(); + + BrokerService getBrokerService(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 8879ccee4a..c78853fddd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -242,5 +242,12 @@ public class BrokerFilter implements Broker { public URI getVmConnectorURI(){ return next.getVmConnectorURI(); } + + public void brokerServiceStarted(){ + next.brokerServiceStarted(); + } + public BrokerService getBrokerService(){ + return next.getBrokerService(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 4eeefe30e3..ba92783578 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -153,6 +153,7 @@ public class BrokerService implements Service { private int persistenceThreadPriority = Thread.MAX_PRIORITY; private boolean useLocalHostBrokerName = false; private CountDownLatch stoppedLatch = new CountDownLatch(1); + private boolean supportFailOver = false; static{ String localHostName = "localhost"; @@ -364,7 +365,7 @@ public class BrokerService implements Service { /** * @return true if this Broker is a slave to a Master */ - public boolean isSlave(){ + public synchronized boolean isSlave(){ return masterConnector != null && masterConnector.isSlave(); } @@ -436,6 +437,7 @@ public class BrokerService implements Service { brokerId = broker.getBrokerId(); log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started"); + getBroker().brokerServiceStarted(); } catch (Exception e) { log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); @@ -486,7 +488,6 @@ public class BrokerService implements Service { VMTransportFactory.stopped(getBrokerName()); stopped.set(true); stoppedLatch.countDown(); - log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped"); stopper.throwFirstException(); } @@ -1104,6 +1105,20 @@ public class BrokerService implements Service { brokerName=LOCAL_HOST_NAME; } } + + /** + * @return the supportFailOver + */ + public boolean isSupportFailOver(){ + return this.supportFailOver; + } + + /** + * @param supportFailOver the supportFailOver to set + */ + public void setSupportFailOver(boolean supportFailOver){ + this.supportFailOver=supportFailOver; + } // Implementation methods // ------------------------------------------------------------------------- @@ -1649,6 +1664,7 @@ public class BrokerService implements Service { } if (service instanceof MasterConnector) { masterConnector = (MasterConnector) service; + supportFailOver=true; } } @@ -1675,8 +1691,6 @@ public class BrokerService implements Service { broker.addDestination(adminConnectionContext, destination); } } - } - - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 2b5f9c2713..bef07ae7d3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -241,5 +241,11 @@ public class EmptyBroker implements Broker { public URI getVmConnectorURI(){ return null; } - + + public void brokerServiceStarted(){ + } + + public BrokerService getBrokerService(){ + return null; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index e93d638db8..57e58ec3a0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -240,5 +240,12 @@ public class ErrorBroker implements Broker { public URI getVmConnectorURI(){ throw new BrokerStoppedException(this.message); } - + + public void brokerServiceStarted(){ + throw new BrokerStoppedException(this.message); + } + + public BrokerService getBrokerService(){ + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 9e58900005..7c544e5761 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -254,5 +254,13 @@ public class MutableBrokerFilter implements Broker { public URI getVmConnectorURI(){ return getNext().getVmConnectorURI(); } + + public void brokerServiceStarted(){ + getNext().brokerServiceStarted(); + } + + public BrokerService getBrokerService(){ + return getNext().getBrokerService(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index c922ebbfae..3d0a12f2bc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.Message; @@ -28,6 +29,7 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; import org.apache.activemq.transaction.LocalTransaction; +import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transaction.XATransaction; import org.apache.activemq.util.IOExceptionSupport; @@ -36,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import javax.jms.JMSException; + import javax.transaction.xa.XAException; import java.util.ArrayList; @@ -55,6 +58,7 @@ public class TransactionBroker extends BrokerFilter { // The prepared XA transactions. private TransactionStore transactionStore; private Map xaTransactions = new LinkedHashMap(); + ActiveMQMessageAudit audit; public TransactionBroker(Broker next, TransactionStore transactionStore) { super(next); @@ -189,20 +193,41 @@ public class TransactionBroker extends BrokerFilter { } } - public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { - // This method may be invoked recursively. + public void send(ProducerBrokerExchange producerExchange,final Message message) throws Exception{ + // This method may be invoked recursively. // Track original tx so that it can be restored. - final ConnectionContext context = producerExchange.getConnectionContext(); - Transaction originalTx = context.getTransaction(); + final ConnectionContext context=producerExchange.getConnectionContext(); + Transaction originalTx=context.getTransaction(); Transaction transaction=null; - if( message.getTransactionId()!=null ) { - transaction = getTransaction(context, message.getTransactionId(), false); + Synchronization sync=null; + if(message.getTransactionId()!=null){ + transaction=getTransaction(context,message.getTransactionId(),false); + if(transaction!=null){ + sync=new Synchronization(){ + + public void afterRollback(){ + if(audit!=null){ + audit.rollbackMessageReference(message); + } + } + }; + transaction.addSynchronization(sync); + } } - context.setTransaction(transaction); - try { - next.send(producerExchange, message); - } finally { - context.setTransaction(originalTx); + if(audit==null||!audit.isDuplicateMessageReference(message)){ + context.setTransaction(transaction); + try{ + next.send(producerExchange,message); + }finally{ + context.setTransaction(originalTx); + } + }else{ + if(sync!=null&&transaction!=null){ + transaction.removeSynchronization(sync); + } + if(log.isDebugEnabled()){ + log.debug("IGNORING duplicate message "+message); + } } } @@ -248,5 +273,12 @@ public class TransactionBroker extends BrokerFilter { xaTransactions.remove(xid); } } + + public synchronized void brokerServiceStarted(){ + super.brokerServiceStarted(); + if(getBrokerService().isSupportFailOver()&&audit==null){ + audit=new ActiveMQMessageAudit(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 959e0a648a..61bd41b0ea 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -479,24 +479,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit public Response processMessage(Message messageSend) throws Exception{ ProducerId producerId=messageSend.getProducerId(); ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId); - ProducerState producerState = null; - if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){ - producerState=producerExchange.getProducerState(); - } - if(producerState!=null){ - long seq=messageSend.getMessageId().getProducerSequenceId(); - if(seq>producerState.getLastSequenceId()){ - producerState.setLastSequenceId(seq); - broker.send(producerExchange,messageSend); - }else { - if (log.isDebugEnabled()) { - log.debug("Discarding duplicate: " + messageSend); - } - } - }else{ - // producer not local to this broker - broker.send(producerExchange,messageSend); - } + broker.send(producerExchange,messageSend); return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 1f88e812a6..8ddb90e2f1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -132,6 +132,7 @@ public class TransportConnector implements Connector { this.broker = broker; brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); + brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); } public void setBrokerName(String brokerName) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java index 6be3a7c415..d92fdb91ca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java @@ -288,9 +288,11 @@ public class MasterBroker extends InsertableMutableBrokerFilter{ mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); mdn.setDestination(messageDispatch.getDestination()); - if(messageDispatch.getMessage()!=null) - mdn.setMessageId(messageDispatch.getMessage().getMessageId()); - sendAsyncToSlave(mdn); + if(messageDispatch.getMessage()!=null){ + Message msg=messageDispatch.getMessage(); + mdn.setMessageId(msg.getMessageId()); + sendAsyncToSlave(mdn); + } super.processDispatch(messageDispatch); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java index 25229a5f0e..8ec9d19abe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java @@ -55,146 +55,137 @@ import java.util.concurrent.atomic.AtomicBoolean; * * @version $Revision$ */ -public class MasterConnector implements Service, BrokerServiceAware { +public class MasterConnector implements Service,BrokerServiceAware{ - private static final Log log = LogFactory.getLog(MasterConnector.class); + private static final Log log=LogFactory.getLog(MasterConnector.class); private BrokerService broker; private URI remoteURI; private URI localURI; private Transport localBroker; private Transport remoteBroker; private TransportConnector connector; - private AtomicBoolean masterActive = new AtomicBoolean(false); - private AtomicBoolean started = new AtomicBoolean(false); - private final IdGenerator idGenerator = new IdGenerator(); + private AtomicBoolean started=new AtomicBoolean(false); + private final IdGenerator idGenerator=new IdGenerator(); private String userName; private String password; private ConnectionInfo connectionInfo; private SessionInfo sessionInfo; private ProducerInfo producerInfo; + final AtomicBoolean masterActive = new AtomicBoolean(); - public MasterConnector() { + public MasterConnector(){ } - public MasterConnector(String remoteUri) throws URISyntaxException { - remoteURI = new URI(remoteUri); + public MasterConnector(String remoteUri) throws URISyntaxException{ + remoteURI=new URI(remoteUri); } - public void setBrokerService(BrokerService broker) { - this.broker = broker; - if (localURI == null) { - localURI = broker.getVmConnectorURI(); + public void setBrokerService(BrokerService broker){ + this.broker=broker; + if(localURI==null){ + localURI=broker.getVmConnectorURI(); } - if (connector == null) { - List transportConnectors = broker.getTransportConnectors(); - if (!transportConnectors.isEmpty()) { - connector = (TransportConnector) transportConnectors.get(0); + if(connector==null){ + List transportConnectors=broker.getTransportConnectors(); + if(!transportConnectors.isEmpty()){ + connector=(TransportConnector)transportConnectors.get(0); } } } - public boolean isSlave() { + public boolean isSlave(){ return masterActive.get(); } - public void start() throws Exception { - if (!started.compareAndSet(false, true)) { + public void start() throws Exception{ + if(!started.compareAndSet(false,true)){ return; } - if (remoteURI == null) { + if(remoteURI==null){ throw new IllegalArgumentException("You must specify a remoteURI"); } - localBroker = TransportFactory.connect(localURI); - remoteBroker = TransportFactory.connect(remoteURI); - log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); + localBroker=TransportFactory.connect(localURI); + remoteBroker=TransportFactory.connect(remoteURI); + log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); + localBroker.setTransportListener(new DefaultTransportListener(){ - localBroker.setTransportListener(new DefaultTransportListener() { - public void onCommand(Object command) { + public void onCommand(Object command){ } - public void onException(IOException error) { - if (started.get()) { + public void onException(IOException error){ + if(started.get()){ serviceLocalException(error); } } }); + remoteBroker.setTransportListener(new DefaultTransportListener(){ - remoteBroker.setTransportListener(new DefaultTransportListener() { - public void onCommand(Object o) { - Command command = (Command) o; - if (started.get()) { + public void onCommand(Object o){ + Command command=(Command)o; + if(started.get()){ serviceRemoteCommand(command); } } - public void onException(IOException error) { - if (started.get()) { + public void onException(IOException error){ + if(started.get()){ serviceRemoteException(error); } } }); - masterActive.set(true); - Thread thead = new Thread() { - public void run() { - try { + Thread thead=new Thread(){ + + public void run(){ + try{ localBroker.start(); remoteBroker.start(); startBridge(); - } - catch (Exception e) { + }catch(Exception e){ masterActive.set(false); - log.error("Failed to start network bridge: " + e, e); + log.error("Failed to start network bridge: "+e,e); } } }; thead.start(); - } - protected void startBridge() throws Exception { - connectionInfo = new ConnectionInfo(); + protected void startBridge() throws Exception{ + connectionInfo=new ConnectionInfo(); connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); connectionInfo.setClientId(idGenerator.generateId()); connectionInfo.setUserName(userName); connectionInfo.setPassword(password); localBroker.oneway(connectionInfo); - ConnectionInfo remoteInfo = new ConnectionInfo(); + ConnectionInfo remoteInfo=new ConnectionInfo(); connectionInfo.copy(remoteInfo); remoteInfo.setBrokerMasterConnector(true); remoteBroker.oneway(connectionInfo); - - sessionInfo = new SessionInfo(connectionInfo, 1); + sessionInfo=new SessionInfo(connectionInfo,1); localBroker.oneway(sessionInfo); remoteBroker.oneway(sessionInfo); - - producerInfo = new ProducerInfo(sessionInfo, 1); + producerInfo=new ProducerInfo(sessionInfo,1); producerInfo.setResponseRequired(false); remoteBroker.oneway(producerInfo); - - BrokerInfo brokerInfo = null; - if (connector != null) { - - brokerInfo = connector.getBrokerInfo(); - } - else { - brokerInfo = new BrokerInfo(); + BrokerInfo brokerInfo=null; + if(connector!=null){ + brokerInfo=connector.getBrokerInfo(); + }else{ + brokerInfo=new BrokerInfo(); } brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); brokerInfo.setSlaveBroker(true); remoteBroker.oneway(brokerInfo); - - log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established."); + log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established."); } - public void stop() throws Exception { - if (!started.compareAndSet(true, false)) { + public void stop() throws Exception{ + if(!started.compareAndSet(true,false)){ return; } - masterActive.set(false); - try { + try{ // if (connectionInfo!=null){ // localBroker.request(connectionInfo.createRemoveCommand()); // } @@ -202,59 +193,56 @@ public class MasterConnector implements Service, BrokerServiceAware { // remoteBroker.setTransportListener(null); remoteBroker.oneway(new ShutdownInfo()); localBroker.oneway(new ShutdownInfo()); - } - catch (IOException e) { - log.debug("Caught exception stopping", e); - } - finally { - ServiceStopper ss = new ServiceStopper(); + }catch(IOException e){ + log.debug("Caught exception stopping",e); + }finally{ + ServiceStopper ss=new ServiceStopper(); ss.stop(localBroker); ss.stop(remoteBroker); ss.throwFirstException(); } } - protected void serviceRemoteException(IOException error) { - log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); + protected void serviceRemoteException(IOException error){ + log + .error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(), + error); shutDown(); } - protected void serviceRemoteCommand(Command command) { - try { - if (command.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch) command; - command = md.getMessage(); + protected void serviceRemoteCommand(Command command){ + try{ + if(command.isMessageDispatch()){ + MessageDispatch md=(MessageDispatch)command; + command=md.getMessage(); } - if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) { + if(command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){ log.warn("The Master has shutdown"); shutDown(); - - } - else { - boolean responseRequired = command.isResponseRequired(); - int commandId = command.getCommandId(); + }else{ + boolean responseRequired=command.isResponseRequired(); + int commandId=command.getCommandId(); localBroker.oneway(command); - if (responseRequired) { - Response response = new Response(); + if(responseRequired){ + Response response=new Response(); response.setCorrelationId(commandId); remoteBroker.oneway(response); } } - } - catch (IOException e) { + }catch(IOException e){ serviceRemoteException(e); } } - protected void serviceLocalException(Throwable error) { - log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); + protected void serviceLocalException(Throwable error){ + log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); ServiceSupport.dispose(this); } /** * @return Returns the localURI. */ - public URI getLocalURI() { + public URI getLocalURI(){ return localURI; } @@ -262,14 +250,14 @@ public class MasterConnector implements Service, BrokerServiceAware { * @param localURI * The localURI to set. */ - public void setLocalURI(URI localURI) { - this.localURI = localURI; + public void setLocalURI(URI localURI){ + this.localURI=localURI; } /** * @return Returns the remoteURI. */ - public URI getRemoteURI() { + public URI getRemoteURI(){ return remoteURI; } @@ -277,14 +265,14 @@ public class MasterConnector implements Service, BrokerServiceAware { * @param remoteURI * The remoteURI to set. */ - public void setRemoteURI(URI remoteURI) { - this.remoteURI = remoteURI; + public void setRemoteURI(URI remoteURI){ + this.remoteURI=remoteURI; } /** * @return Returns the password. */ - public String getPassword() { + public String getPassword(){ return password; } @@ -292,14 +280,14 @@ public class MasterConnector implements Service, BrokerServiceAware { * @param password * The password to set. */ - public void setPassword(String password) { - this.password = password; + public void setPassword(String password){ + this.password=password; } /** * @return Returns the userName. */ - public String getUserName() { + public String getUserName(){ return userName; } @@ -307,14 +295,13 @@ public class MasterConnector implements Service, BrokerServiceAware { * @param userName * The userName to set. */ - public void setUserName(String userName) { - this.userName = userName; + public void setUserName(String userName){ + this.userName=userName; } - private void shutDown() { + private void shutDown(){ masterActive.set(false); broker.masterFailed(); ServiceSupport.dispose(this); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 67057ce88a..86edd23f88 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -55,7 +54,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; - private AtomicBoolean dispatching=new AtomicBoolean(); + public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor) throws InvalidSelectorException{ @@ -207,7 +206,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } // this only happens after a reconnect - get an ack which is not valid if(!callDispatchMatched){ - log.info("Could not correlate acknowledgment with dispatched message: "+ack); + if (log.isDebugEnabled()) { + log.debug("Could not correlate acknowledgment with dispatched message: "+ack); + } } }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. @@ -376,35 +377,31 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } protected synchronized void dispatchMatched() throws IOException{ - if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){ + if(!broker.isSlaveBroker()){ try{ - try{ - int numberToDispatch=countBeforeFull(); - if(numberToDispatch>0){ - pending.setMaxBatchSize(numberToDispatch); - int count=0; - pending.reset(); - while(pending.hasNext()&&!isFull()&&count0){ + pending.setMaxBatchSize(numberToDispatch); + int count=0; + pending.reset(); + while(pending.hasNext()&&!isFull()&&count