Added duplicate detection to the TransactionBroker - so can cope with rollbacks etc.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@552738 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-03 08:31:10 +00:00
parent c7469c433c
commit 25a252f348
16 changed files with 253 additions and 184 deletions

View File

@ -250,4 +250,8 @@ public interface Broker extends Region, Service {
* @return the URI that can be used to connect to the local Broker * @return the URI that can be used to connect to the local Broker
*/ */
public URI getVmConnectorURI(); public URI getVmConnectorURI();
public void brokerServiceStarted();
BrokerService getBrokerService();
} }

View File

@ -242,5 +242,12 @@ public class BrokerFilter implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
return next.getVmConnectorURI(); return next.getVmConnectorURI();
} }
public void brokerServiceStarted(){
next.brokerServiceStarted();
}
public BrokerService getBrokerService(){
return next.getBrokerService();
}
} }

View File

@ -153,6 +153,7 @@ public class BrokerService implements Service {
private int persistenceThreadPriority = Thread.MAX_PRIORITY; private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false; private boolean useLocalHostBrokerName = false;
private CountDownLatch stoppedLatch = new CountDownLatch(1); private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver = false;
static{ static{
String localHostName = "localhost"; String localHostName = "localhost";
@ -364,7 +365,7 @@ public class BrokerService implements Service {
/** /**
* @return true if this Broker is a slave to a Master * @return true if this Broker is a slave to a Master
*/ */
public boolean isSlave(){ public synchronized boolean isSlave(){
return masterConnector != null && masterConnector.isSlave(); return masterConnector != null && masterConnector.isSlave();
} }
@ -436,6 +437,7 @@ public class BrokerService implements Service {
brokerId = broker.getBrokerId(); brokerId = broker.getBrokerId();
log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started"); log.info("ActiveMQ JMS Message Broker (" + getBrokerName()+", "+brokerId+") started");
getBroker().brokerServiceStarted();
} }
catch (Exception e) { catch (Exception e) {
log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
@ -486,7 +488,6 @@ public class BrokerService implements Service {
VMTransportFactory.stopped(getBrokerName()); VMTransportFactory.stopped(getBrokerName());
stopped.set(true); stopped.set(true);
stoppedLatch.countDown(); stoppedLatch.countDown();
log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped"); log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped");
stopper.throwFirstException(); stopper.throwFirstException();
} }
@ -1104,6 +1105,20 @@ public class BrokerService implements Service {
brokerName=LOCAL_HOST_NAME; brokerName=LOCAL_HOST_NAME;
} }
} }
/**
* @return the supportFailOver
*/
public boolean isSupportFailOver(){
return this.supportFailOver;
}
/**
* @param supportFailOver the supportFailOver to set
*/
public void setSupportFailOver(boolean supportFailOver){
this.supportFailOver=supportFailOver;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -1649,6 +1664,7 @@ public class BrokerService implements Service {
} }
if (service instanceof MasterConnector) { if (service instanceof MasterConnector) {
masterConnector = (MasterConnector) service; masterConnector = (MasterConnector) service;
supportFailOver=true;
} }
} }
@ -1675,8 +1691,6 @@ public class BrokerService implements Service {
broker.addDestination(adminConnectionContext, destination); broker.addDestination(adminConnectionContext, destination);
} }
} }
} }
} }

View File

@ -241,5 +241,11 @@ public class EmptyBroker implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
return null; return null;
} }
public void brokerServiceStarted(){
}
public BrokerService getBrokerService(){
return null;
}
} }

View File

@ -240,5 +240,12 @@ public class ErrorBroker implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public void brokerServiceStarted(){
throw new BrokerStoppedException(this.message);
}
public BrokerService getBrokerService(){
throw new BrokerStoppedException(this.message);
}
} }

View File

@ -254,5 +254,13 @@ public class MutableBrokerFilter implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
return getNext().getVmConnectorURI(); return getNext().getVmConnectorURI();
} }
public void brokerServiceStarted(){
getNext().brokerServiceStarted();
}
public BrokerService getBrokerService(){
return getNext().getBrokerService();
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -28,6 +29,7 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.transaction.LocalTransaction; import org.apache.activemq.transaction.LocalTransaction;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transaction.XATransaction; import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -36,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
import java.util.ArrayList; import java.util.ArrayList;
@ -55,6 +58,7 @@ public class TransactionBroker extends BrokerFilter {
// The prepared XA transactions. // The prepared XA transactions.
private TransactionStore transactionStore; private TransactionStore transactionStore;
private Map xaTransactions = new LinkedHashMap(); private Map xaTransactions = new LinkedHashMap();
ActiveMQMessageAudit audit;
public TransactionBroker(Broker next, TransactionStore transactionStore) { public TransactionBroker(Broker next, TransactionStore transactionStore) {
super(next); super(next);
@ -189,20 +193,41 @@ public class TransactionBroker extends BrokerFilter {
} }
} }
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange,final Message message) throws Exception{
// This method may be invoked recursively. // This method may be invoked recursively.
// Track original tx so that it can be restored. // Track original tx so that it can be restored.
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context=producerExchange.getConnectionContext();
Transaction originalTx = context.getTransaction(); Transaction originalTx=context.getTransaction();
Transaction transaction=null; Transaction transaction=null;
if( message.getTransactionId()!=null ) { Synchronization sync=null;
transaction = getTransaction(context, message.getTransactionId(), false); if(message.getTransactionId()!=null){
transaction=getTransaction(context,message.getTransactionId(),false);
if(transaction!=null){
sync=new Synchronization(){
public void afterRollback(){
if(audit!=null){
audit.rollbackMessageReference(message);
}
}
};
transaction.addSynchronization(sync);
}
} }
context.setTransaction(transaction); if(audit==null||!audit.isDuplicateMessageReference(message)){
try { context.setTransaction(transaction);
next.send(producerExchange, message); try{
} finally { next.send(producerExchange,message);
context.setTransaction(originalTx); }finally{
context.setTransaction(originalTx);
}
}else{
if(sync!=null&&transaction!=null){
transaction.removeSynchronization(sync);
}
if(log.isDebugEnabled()){
log.debug("IGNORING duplicate message "+message);
}
} }
} }
@ -248,5 +273,12 @@ public class TransactionBroker extends BrokerFilter {
xaTransactions.remove(xid); xaTransactions.remove(xid);
} }
} }
public synchronized void brokerServiceStarted(){
super.brokerServiceStarted();
if(getBrokerService().isSupportFailOver()&&audit==null){
audit=new ActiveMQMessageAudit();
}
}
} }

View File

@ -479,24 +479,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
public Response processMessage(Message messageSend) throws Exception{ public Response processMessage(Message messageSend) throws Exception{
ProducerId producerId=messageSend.getProducerId(); ProducerId producerId=messageSend.getProducerId();
ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId); ProducerBrokerExchange producerExchange=getProducerBrokerExchange(producerId);
ProducerState producerState = null; broker.send(producerExchange,messageSend);
if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
producerState=producerExchange.getProducerState();
}
if(producerState!=null){
long seq=messageSend.getMessageId().getProducerSequenceId();
if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq);
broker.send(producerExchange,messageSend);
}else {
if (log.isDebugEnabled()) {
log.debug("Discarding duplicate: " + messageSend);
}
}
}else{
// producer not local to this broker
broker.send(producerExchange,messageSend);
}
return null; return null;
} }

View File

@ -132,6 +132,7 @@ public class TransportConnector implements Connector {
this.broker = broker; this.broker = broker;
brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
} }
public void setBrokerName(String brokerName) { public void setBrokerName(String brokerName) {

View File

@ -288,9 +288,11 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
mdn.setConsumerId(messageDispatch.getConsumerId()); mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination()); mdn.setDestination(messageDispatch.getDestination());
if(messageDispatch.getMessage()!=null) if(messageDispatch.getMessage()!=null){
mdn.setMessageId(messageDispatch.getMessage().getMessageId()); Message msg=messageDispatch.getMessage();
sendAsyncToSlave(mdn); mdn.setMessageId(msg.getMessageId());
sendAsyncToSlave(mdn);
}
super.processDispatch(messageDispatch); super.processDispatch(messageDispatch);
} }

View File

@ -55,146 +55,137 @@ import java.util.concurrent.atomic.AtomicBoolean;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class MasterConnector implements Service, BrokerServiceAware { public class MasterConnector implements Service,BrokerServiceAware{
private static final Log log = LogFactory.getLog(MasterConnector.class); private static final Log log=LogFactory.getLog(MasterConnector.class);
private BrokerService broker; private BrokerService broker;
private URI remoteURI; private URI remoteURI;
private URI localURI; private URI localURI;
private Transport localBroker; private Transport localBroker;
private Transport remoteBroker; private Transport remoteBroker;
private TransportConnector connector; private TransportConnector connector;
private AtomicBoolean masterActive = new AtomicBoolean(false); private AtomicBoolean started=new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false); private final IdGenerator idGenerator=new IdGenerator();
private final IdGenerator idGenerator = new IdGenerator();
private String userName; private String userName;
private String password; private String password;
private ConnectionInfo connectionInfo; private ConnectionInfo connectionInfo;
private SessionInfo sessionInfo; private SessionInfo sessionInfo;
private ProducerInfo producerInfo; private ProducerInfo producerInfo;
final AtomicBoolean masterActive = new AtomicBoolean();
public MasterConnector() { public MasterConnector(){
} }
public MasterConnector(String remoteUri) throws URISyntaxException { public MasterConnector(String remoteUri) throws URISyntaxException{
remoteURI = new URI(remoteUri); remoteURI=new URI(remoteUri);
} }
public void setBrokerService(BrokerService broker) { public void setBrokerService(BrokerService broker){
this.broker = broker; this.broker=broker;
if (localURI == null) { if(localURI==null){
localURI = broker.getVmConnectorURI(); localURI=broker.getVmConnectorURI();
} }
if (connector == null) { if(connector==null){
List transportConnectors = broker.getTransportConnectors(); List transportConnectors=broker.getTransportConnectors();
if (!transportConnectors.isEmpty()) { if(!transportConnectors.isEmpty()){
connector = (TransportConnector) transportConnectors.get(0); connector=(TransportConnector)transportConnectors.get(0);
} }
} }
} }
public boolean isSlave() { public boolean isSlave(){
return masterActive.get(); return masterActive.get();
} }
public void start() throws Exception { public void start() throws Exception{
if (!started.compareAndSet(false, true)) { if(!started.compareAndSet(false,true)){
return; return;
} }
if (remoteURI == null) { if(remoteURI==null){
throw new IllegalArgumentException("You must specify a remoteURI"); throw new IllegalArgumentException("You must specify a remoteURI");
} }
localBroker = TransportFactory.connect(localURI); localBroker=TransportFactory.connect(localURI);
remoteBroker = TransportFactory.connect(remoteURI); remoteBroker=TransportFactory.connect(remoteURI);
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established."); log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new DefaultTransportListener(){
localBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object command){
public void onCommand(Object command) {
} }
public void onException(IOException error) { public void onException(IOException error){
if (started.get()) { if(started.get()){
serviceLocalException(error); serviceLocalException(error);
} }
} }
}); });
remoteBroker.setTransportListener(new DefaultTransportListener(){
remoteBroker.setTransportListener(new DefaultTransportListener() { public void onCommand(Object o){
public void onCommand(Object o) { Command command=(Command)o;
Command command = (Command) o; if(started.get()){
if (started.get()) {
serviceRemoteCommand(command); serviceRemoteCommand(command);
} }
} }
public void onException(IOException error) { public void onException(IOException error){
if (started.get()) { if(started.get()){
serviceRemoteException(error); serviceRemoteException(error);
} }
} }
}); });
masterActive.set(true); masterActive.set(true);
Thread thead = new Thread() { Thread thead=new Thread(){
public void run() {
try { public void run(){
try{
localBroker.start(); localBroker.start();
remoteBroker.start(); remoteBroker.start();
startBridge(); startBridge();
} }catch(Exception e){
catch (Exception e) {
masterActive.set(false); masterActive.set(false);
log.error("Failed to start network bridge: " + e, e); log.error("Failed to start network bridge: "+e,e);
} }
} }
}; };
thead.start(); thead.start();
} }
protected void startBridge() throws Exception { protected void startBridge() throws Exception{
connectionInfo = new ConnectionInfo(); connectionInfo=new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(idGenerator.generateId()); connectionInfo.setClientId(idGenerator.generateId());
connectionInfo.setUserName(userName); connectionInfo.setUserName(userName);
connectionInfo.setPassword(password); connectionInfo.setPassword(password);
localBroker.oneway(connectionInfo); localBroker.oneway(connectionInfo);
ConnectionInfo remoteInfo = new ConnectionInfo(); ConnectionInfo remoteInfo=new ConnectionInfo();
connectionInfo.copy(remoteInfo); connectionInfo.copy(remoteInfo);
remoteInfo.setBrokerMasterConnector(true); remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo); remoteBroker.oneway(connectionInfo);
sessionInfo=new SessionInfo(connectionInfo,1);
sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo); localBroker.oneway(sessionInfo);
remoteBroker.oneway(sessionInfo); remoteBroker.oneway(sessionInfo);
producerInfo=new ProducerInfo(sessionInfo,1);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false); producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo); remoteBroker.oneway(producerInfo);
BrokerInfo brokerInfo=null;
BrokerInfo brokerInfo = null; if(connector!=null){
if (connector != null) { brokerInfo=connector.getBrokerInfo();
}else{
brokerInfo = connector.getBrokerInfo(); brokerInfo=new BrokerInfo();
}
else {
brokerInfo = new BrokerInfo();
} }
brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true); brokerInfo.setSlaveBroker(true);
remoteBroker.oneway(brokerInfo); remoteBroker.oneway(brokerInfo);
log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
log.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
} }
public void stop() throws Exception { public void stop() throws Exception{
if (!started.compareAndSet(true, false)) { if(!started.compareAndSet(true,false)){
return; return;
} }
masterActive.set(false); masterActive.set(false);
try { try{
// if (connectionInfo!=null){ // if (connectionInfo!=null){
// localBroker.request(connectionInfo.createRemoveCommand()); // localBroker.request(connectionInfo.createRemoveCommand());
// } // }
@ -202,59 +193,56 @@ public class MasterConnector implements Service, BrokerServiceAware {
// remoteBroker.setTransportListener(null); // remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo()); remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo()); localBroker.oneway(new ShutdownInfo());
} }catch(IOException e){
catch (IOException e) { log.debug("Caught exception stopping",e);
log.debug("Caught exception stopping", e); }finally{
} ServiceStopper ss=new ServiceStopper();
finally {
ServiceStopper ss = new ServiceStopper();
ss.stop(localBroker); ss.stop(localBroker);
ss.stop(remoteBroker); ss.stop(remoteBroker);
ss.throwFirstException(); ss.throwFirstException();
} }
} }
protected void serviceRemoteException(IOException error) { protected void serviceRemoteException(IOException error){
log.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); log
.error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),
error);
shutDown(); shutDown();
} }
protected void serviceRemoteCommand(Command command) { protected void serviceRemoteCommand(Command command){
try { try{
if (command.isMessageDispatch()) { if(command.isMessageDispatch()){
MessageDispatch md = (MessageDispatch) command; MessageDispatch md=(MessageDispatch)command;
command = md.getMessage(); command=md.getMessage();
} }
if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) { if(command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
log.warn("The Master has shutdown"); log.warn("The Master has shutdown");
shutDown(); shutDown();
}else{
} boolean responseRequired=command.isResponseRequired();
else { int commandId=command.getCommandId();
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
localBroker.oneway(command); localBroker.oneway(command);
if (responseRequired) { if(responseRequired){
Response response = new Response(); Response response=new Response();
response.setCorrelationId(commandId); response.setCorrelationId(commandId);
remoteBroker.oneway(response); remoteBroker.oneway(response);
} }
} }
} }catch(IOException e){
catch (IOException e) {
serviceRemoteException(e); serviceRemoteException(e);
} }
} }
protected void serviceLocalException(Throwable error) { protected void serviceLocalException(Throwable error){
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
/** /**
* @return Returns the localURI. * @return Returns the localURI.
*/ */
public URI getLocalURI() { public URI getLocalURI(){
return localURI; return localURI;
} }
@ -262,14 +250,14 @@ public class MasterConnector implements Service, BrokerServiceAware {
* @param localURI * @param localURI
* The localURI to set. * The localURI to set.
*/ */
public void setLocalURI(URI localURI) { public void setLocalURI(URI localURI){
this.localURI = localURI; this.localURI=localURI;
} }
/** /**
* @return Returns the remoteURI. * @return Returns the remoteURI.
*/ */
public URI getRemoteURI() { public URI getRemoteURI(){
return remoteURI; return remoteURI;
} }
@ -277,14 +265,14 @@ public class MasterConnector implements Service, BrokerServiceAware {
* @param remoteURI * @param remoteURI
* The remoteURI to set. * The remoteURI to set.
*/ */
public void setRemoteURI(URI remoteURI) { public void setRemoteURI(URI remoteURI){
this.remoteURI = remoteURI; this.remoteURI=remoteURI;
} }
/** /**
* @return Returns the password. * @return Returns the password.
*/ */
public String getPassword() { public String getPassword(){
return password; return password;
} }
@ -292,14 +280,14 @@ public class MasterConnector implements Service, BrokerServiceAware {
* @param password * @param password
* The password to set. * The password to set.
*/ */
public void setPassword(String password) { public void setPassword(String password){
this.password = password; this.password=password;
} }
/** /**
* @return Returns the userName. * @return Returns the userName.
*/ */
public String getUserName() { public String getUserName(){
return userName; return userName;
} }
@ -307,14 +295,13 @@ public class MasterConnector implements Service, BrokerServiceAware {
* @param userName * @param userName
* The userName to set. * The userName to set.
*/ */
public void setUserName(String userName) { public void setUserName(String userName){
this.userName = userName; this.userName=userName;
} }
private void shutDown() { private void shutDown(){
masterActive.set(false); masterActive.set(false);
broker.masterFailed(); broker.masterFailed();
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
} }

View File

@ -17,7 +17,6 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -55,7 +54,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected long enqueueCounter; protected long enqueueCounter;
protected long dispatchCounter; protected long dispatchCounter;
protected long dequeueCounter; protected long dequeueCounter;
private AtomicBoolean dispatching=new AtomicBoolean();
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor) public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor)
throws InvalidSelectorException{ throws InvalidSelectorException{
@ -207,7 +206,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
// this only happens after a reconnect - get an ack which is not valid // this only happens after a reconnect - get an ack which is not valid
if(!callDispatchMatched){ if(!callDispatchMatched){
log.info("Could not correlate acknowledgment with dispatched message: "+ack); if (log.isDebugEnabled()) {
log.debug("Could not correlate acknowledgment with dispatched message: "+ack);
}
} }
}else if(ack.isDeliveredAck()){ }else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters. // Message was delivered but not acknowledged: update pre-fetch counters.
@ -376,35 +377,31 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
protected synchronized void dispatchMatched() throws IOException{ protected synchronized void dispatchMatched() throws IOException{
if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){ if(!broker.isSlaveBroker()){
try{ try{
try{ int numberToDispatch=countBeforeFull();
int numberToDispatch=countBeforeFull(); if(numberToDispatch>0){
if(numberToDispatch>0){ pending.setMaxBatchSize(numberToDispatch);
pending.setMaxBatchSize(numberToDispatch); int count=0;
int count=0; pending.reset();
pending.reset(); while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
while(pending.hasNext()&&!isFull()&&count<numberToDispatch){ MessageReference node=pending.next();
MessageReference node=pending.next(); if(node==null)
if(node==null) break;
break; if(canDispatch(node)){
if(canDispatch(node)){ pending.remove();
pending.remove(); // Message may have been sitting in the pending list a while
// Message may have been sitting in the pending list a while // waiting for the consumer to ak the message.
// waiting for the consumer to ak the message. if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ continue; // just drop it.
continue; // just drop it.
}
dispatch(node);
count++;
} }
dispatch(node);
count++;
} }
} }
}finally{
pending.release();
} }
}finally{ }finally{
dispatching.set(false); pending.release();
} }
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.Set;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
@ -35,6 +36,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@ -63,6 +65,8 @@ import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -74,7 +78,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
* @version $Revision$ * @version $Revision$
*/ */
public class RegionBroker implements Broker { public class RegionBroker implements Broker {
private static final Log log = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator brokerIdGenerator = new IdGenerator(); private static final IdGenerator brokerIdGenerator = new IdGenerator();
private final Region queueRegion; private final Region queueRegion;
@ -99,6 +103,7 @@ public class RegionBroker implements Broker {
private ConnectionContext adminConnectionContext; private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory; protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
@ -378,26 +383,26 @@ public class RegionBroker implements Broker {
topicRegion.removeSubscription(context, info); topicRegion.removeSubscription(context, info);
} }
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange,Message message) throws Exception{
long si = sequenceGenerator.getNextSequenceId(); long si=sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si); message.getMessageId().setBrokerSequenceId(si);
if (producerExchange.isMutable() || producerExchange.getRegion()==null) { if(producerExchange.isMutable()||producerExchange.getRegion()==null){
ActiveMQDestination destination = message.getDestination(); ActiveMQDestination destination=message.getDestination();
//ensure the destination is registered with the RegionBroker // ensure the destination is registered with the RegionBroker
addDestination(producerExchange.getConnectionContext(),destination); addDestination(producerExchange.getConnectionContext(),destination);
Region region = null; Region region=null;
switch(destination.getDestinationType()) { switch(destination.getDestinationType()){
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
region = queueRegion; region=queueRegion;
break; break;
case ActiveMQDestination.TOPIC_TYPE: case ActiveMQDestination.TOPIC_TYPE:
region = topicRegion; region=topicRegion;
break; break;
case ActiveMQDestination.TEMP_QUEUE_TYPE: case ActiveMQDestination.TEMP_QUEUE_TYPE:
region = tempQueueRegion; region=tempQueueRegion;
break; break;
case ActiveMQDestination.TEMP_TOPIC_TYPE: case ActiveMQDestination.TEMP_TOPIC_TYPE:
region = tempTopicRegion; region=tempTopicRegion;
break; break;
default: default:
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
@ -613,4 +618,13 @@ public class RegionBroker implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
return brokerService.getVmConnectorURI(); return brokerService.getVmConnectorURI();
} }
public void brokerServiceStarted(){
}
public BrokerService getBrokerService(){
return brokerService;
}
} }

View File

@ -33,10 +33,5 @@ public class ProducerState {
public ProducerInfo getInfo() { public ProducerInfo getInfo() {
return info; return info;
} }
public void setLastSequenceId(long lastSequenceId) {
this.lastSequenceId = lastSequenceId;
}
public long getLastSequenceId() {
return lastSequenceId;
}
} }

View File

@ -55,6 +55,10 @@ public abstract class Transaction {
state = IN_USE_STATE; state = IN_USE_STATE;
} }
} }
public void removeSynchronization(Synchronization r) {
synchronizations.remove(r);
}
public void prePrepare() throws Exception { public void prePrepare() throws Exception {

View File

@ -236,4 +236,12 @@ public class StubBroker implements Broker {
public URI getVmConnectorURI(){ public URI getVmConnectorURI(){
return null; return null;
} }
public void brokerServiceStarted(){
}
public BrokerService getBrokerService(){
return null;
}
} }