Added duplicate detection on the client

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@552713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-03 08:19:33 +00:00
parent 5f30e418cb
commit cbaa58b508
5 changed files with 133 additions and 80 deletions

View File

@ -178,6 +178,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// version when a WireFormatInfo is received.
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private long timeCreated;
private ConnectionAudit connectionAudit = new ConnectionAudit();
/**
* Construct an <code>ActiveMQConnection</code>
@ -210,6 +211,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}
@ -947,6 +949,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/
protected void removeSession(ActiveMQSession session) {
this.sessions.remove(session);
this.removeDispatcher(session);
}
/**
@ -966,6 +969,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
this.connectionConsumers.remove(connectionConsumer);
this.removeDispatcher(connectionConsumer);
}
/**
@ -2084,5 +2088,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.producerWindowSize = producerWindowSize;
}
protected void removeDispatcher(ActiveMQDispatcher dispatcher){
connectionAudit.removeDispatcher(dispatcher);
}
protected boolean isDuplicate(ActiveMQDispatcher dispatcher,Message message){
return connectionAudit.isDuplicate(dispatcher,message);
}
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,Message message){
connectionAudit.rollbackDuplicate(dispatcher,message);
}
}

View File

@ -36,6 +36,7 @@ import javax.jms.Message;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -89,7 +90,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order. Only used when session is client ack.
private final LinkedList deliveredMessages = new LinkedList();
private final LinkedList <MessageDispatch>deliveredMessages = new LinkedList<MessageDispatch>();
private int deliveredCounter = 0;
private int additionalWindowSize = 0;
private int rollbackCounter = 0;
@ -342,7 +343,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (wasRunning)
session.stop();
session.redispatch(unconsumedMessages);
session.redispatch(this,unconsumedMessages);
if (wasRunning)
session.start();
@ -574,7 +575,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if(deliveryingAcknowledgements.compareAndSet(false,true)){
if(this.optimizeAcknowledge){
if(!deliveredMessages.isEmpty()){
MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
MessageDispatch md=deliveredMessages.getFirst();
ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
deliveredMessages.clear();
ackCounter=0;
@ -602,24 +603,39 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) {
public void dispose() throws JMSException{
if(!unconsumedMessages.isClosed()){
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
deliverAcks();//only processes optimized acknowledgements
if (executorService!=null){
deliverAcks();// only processes optimized acknowledgements
if(executorService!=null){
executorService.shutdown();
try {
executorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
try{
executorService.awaitTermination(60,TimeUnit.SECONDS);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
if((session.isTransacted()||session.isDupsOkAcknowledge())){
acknowledge();
}
if (session.isClientAcknowledge()) {
if(!this.info.isBrowser()){
// rollback duplicates that aren't acknowledged
for(MessageDispatch old:deliveredMessages){
session.connection.rollbackDuplicate(this,old.getMessage());
}
}
}
deliveredMessages.clear();
List<MessageDispatch> list=unconsumedMessages.removeAll();
if(!this.info.isBrowser()){
for(MessageDispatch old:list){
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this,old.getMessage());
}
}
unconsumedMessages.close();
this.session.removeConsumer(this);
}
@ -766,7 +782,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return;
// Acknowledge the last message.
MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0);
MessageDispatch lastMd = deliveredMessages.get(0);
MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
if (session.isTransacted()) {
session.doStartTransaction();
@ -793,8 +809,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
synchronized(unconsumedMessages.getMutex()){
if(optimizeAcknowledge){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
if(!this.info.isBrowser()){
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
// ensure we don't filter this as a duplicate
MessageDispatch md=deliveredMessages.removeLast();
session.connection.rollbackDuplicate(this,md.getMessage());
}
}
}
if(deliveredMessages.isEmpty())
@ -810,9 +830,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0);
MessageDispatch lastMd=deliveredMessages.get(0);
MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
//ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this,lastMd.getMessage());
// Adjust the window size.
additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
rollbackCounter=0;
@ -848,50 +870,63 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredMessages.clear();
}
if(messageListener!=null){
session.redispatch(unconsumedMessages);
session.redispatch(this,unconsumedMessages);
}
}
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener;
try {
public void dispatch(MessageDispatch md){
MessageListener listener=this.messageListener;
try{
synchronized(unconsumedMessages.getMutex()){
if (clearDispatchList) {
if(clearDispatchList){
// we are reconnecting so lets flush the in progress messages
clearDispatchList = false;
unconsumedMessages.clear();
clearDispatchList=false;
List<MessageDispatch> list=unconsumedMessages.removeAll();
if(!this.info.isBrowser()){
for(MessageDispatch old:list){
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this,old.getMessage());
}
}
}
if(!unconsumedMessages.isClosed()){
if(this.info.isBrowser() || session.connection.isDuplicate(this,md.getMessage())==false){
if(listener!=null&&unconsumedMessages.isRunning()){
ActiveMQMessage message=createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try{
listener.onMessage(message);
afterMessageIsConsumed(md,false);
}catch(RuntimeException e){
if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){
// Redeliver the message
}else{
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md,false);
}
log.error("Exception while processing message: "+e,e);
}
}else{
unconsumedMessages.enqueue(md);
if(availableListener!=null){
availableListener.onMessageAvailable(this);
}
}
}else {
//ignore duplicate
if (log.isDebugEnabled()) {
log.debug("Ignoring Duplicate: " + md.getMessage());
}
ackLater(md,MessageAck.STANDARD_ACK_TYPE);
}
}
if (!unconsumedMessages.isClosed()) {
if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
listener.onMessage(message);
afterMessageIsConsumed(md, false);
} catch (RuntimeException e) {
if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
// Redeliver the message
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false);
}
log.error("Exception while processing message: " + e, e);
}
} else {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
}
}
if (++dispatchedCount%1000==0) {
if(++dispatchedCount%1000==0){
dispatchedCount=0;
Thread.yield();
Thread.yield();
}
} catch (Exception e) {
session.connection.onAsyncException(e);
}catch(Exception e){
session.connection.onAsyncException(e);
}
}

View File

@ -754,7 +754,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if( message.isExpired() ) {
if( message.isExpired() || connection.isDuplicate(ActiveMQSession.this,message)) {
//TODO: Ack it without delivery to client
continue;
}
@ -785,39 +785,35 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if( ack.getTransactionId()!=null ) {
if(ack.getTransactionId()!=null){
getTransactionContext().addSynchronization(new Synchronization(){
public void afterRollback() throws Exception {
public void afterRollback() throws Exception{
md.getMessage().onMessageRolledBack();
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this,md.getMessage());
RedeliveryPolicy redeliveryPolicy=connection.getRedeliveryPolicy();
int redeliveryCounter=md.getMessage().getRedeliveryCounter();
if(redeliveryPolicy.getMaximumRedeliveries()!=RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&&redeliveryCounter>redeliveryPolicy.getMaximumRedeliveries()){
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md,MessageAck.POSION_ACK_TYPE,1);
MessageAck ack=new MessageAck(md,MessageAck.POSION_ACK_TYPE,1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
} else {
}else{
// Figure out how long we should wait to resend this message.
long redeliveryDelay=0;
for( int i=0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
for(int i=0;i<redeliveryCounter;i++){
redeliveryDelay=redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
Scheduler.executeAfterDelay(new Runnable(){
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
public void run(){
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
}
}, redeliveryDelay);
},redeliveryDelay);
}
}
});
@ -1499,6 +1495,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
stats.onRemoveDurableSubscriber();
}
this.consumers.remove(consumer);
this.connection.removeDispatcher(consumer);
}
/**
@ -1765,9 +1762,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
return deliveryIdGenerator.getNextSequenceId();
}
public void redispatch(MessageDispatchChannel unconsumedMessages) throws JMSException {
public void redispatch(ActiveMQDispatcher dispatcher,MessageDispatchChannel unconsumedMessages) throws JMSException {
List c = unconsumedMessages.removeAll();
List <MessageDispatch>c = unconsumedMessages.removeAll();
for (MessageDispatch md: c) {
this.connection.rollbackDuplicate(dispatcher,md.getMessage());
}
Collections.reverse(c);
for (Iterator iter = c.iterator(); iter.hasNext();) {

View File

@ -57,6 +57,7 @@ public class ActiveMQSessionExecutor implements Task {
void execute(MessageDispatch message) throws InterruptedException {
if (!startedOrWarnedThatNotStarted) {
ActiveMQConnection connection = session.connection;
@ -119,6 +120,7 @@ public class ActiveMQSessionExecutor implements Task {
ConsumerId consumerId = message.getConsumerId();
if( consumerId.equals(consumer.getConsumerId()) ) {
consumer.dispatch(message);
break;
}
}
}

View File

@ -28,12 +28,12 @@ import org.apache.activemq.command.MessageDispatch;
public class MessageDispatchChannel {
private final Object mutex = new Object();
private final LinkedList list;
private final LinkedList<MessageDispatch> list;
private boolean closed;
private boolean running;
public MessageDispatchChannel() {
this.list = new LinkedList();
this.list = new LinkedList<MessageDispatch>();
}
public void enqueue(MessageDispatch message) {
@ -84,7 +84,7 @@ public class MessageDispatchChannel {
if (closed || !running || list.isEmpty()) {
return null;
}
return (MessageDispatch) list.removeFirst();
return list.removeFirst();
}
}
@ -93,7 +93,7 @@ public class MessageDispatchChannel {
if (closed || !running || list.isEmpty()) {
return null;
}
return (MessageDispatch) list.removeFirst();
return list.removeFirst();
}
}
@ -102,7 +102,7 @@ public class MessageDispatchChannel {
if (closed || !running || list.isEmpty()) {
return null;
}
return (MessageDispatch) list.getFirst();
return list.getFirst();
}
}
@ -154,9 +154,9 @@ public class MessageDispatchChannel {
return running;
}
public List removeAll() {
public List<MessageDispatch> removeAll() {
synchronized(mutex) {
ArrayList rc = new ArrayList(list);
ArrayList <MessageDispatch>rc = new ArrayList<MessageDispatch>(list);
list.clear();
return rc;
}