From caf7a7c7c2d1f3491b3650bb1ea73ea999db2bfb Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 21 Jan 2008 10:28:59 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1556 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@613829 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 216 ++++++++++-------- 1 file changed, 115 insertions(+), 101 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index be4a69e739..80a71a7eb1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -595,11 +595,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (this.optimizeAcknowledge) { - if (!deliveredMessages.isEmpty()) { - MessageDispatch md = deliveredMessages.getFirst(); - ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); - deliveredMessages.clear(); - ackCounter = 0; + synchronized(deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + MessageDispatch md = deliveredMessages.getFirst(); + ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); + deliveredMessages.clear(); + ackCounter = 0; + } } } if (ack != null) { @@ -712,7 +714,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); if (!session.isDupsOkAcknowledge()) { - deliveredMessages.addFirst(md); + synchronized(deliveredMessages) { + deliveredMessages.addFirst(md); + } if (session.isTransacted()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } @@ -730,24 +734,26 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (session.isTransacted()) { // Do nothing. } else if (session.isAutoAcknowledge()) { - if (!deliveredMessages.isEmpty()) { - if (optimizeAcknowledge) { - if (deliveryingAcknowledgements.compareAndSet(false, true)) { - ackCounter++; - if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, - deliveredMessages.size()); - session.asyncSendPacket(ack); - ackCounter = 0; - deliveredMessages.clear(); + synchronized (deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + if (optimizeAcknowledge) { + if (deliveryingAcknowledgements.compareAndSet( + false, true)) { + ackCounter++; + if (ackCounter >= (info + .getCurrentPrefetchSize() * .65)) { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + ackCounter = 0; + deliveredMessages.clear(); + } + deliveryingAcknowledgements.set(false); } - deliveryingAcknowledgements.set(false); + } else { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + deliveredMessages.clear(); } - } else { - MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages - .size()); - session.asyncSendPacket(ack); - deliveredMessages.clear(); } } } else if (session.isDupsOkAcknowledge()) { @@ -812,30 +818,34 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * @throws JMSException */ public void acknowledge() throws JMSException { - if (deliveredMessages.isEmpty()) { - return; - } - - // Acknowledge the last message. - MessageDispatch lastMd = deliveredMessages.get(0); - MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); - if (session.isTransacted()) { - session.doStartTransaction(); - ack.setTransactionId(session.getTransactionContext().getTransactionId()); - } - session.asyncSendPacket(ack); - - // Adjust the counters - deliveredCounter -= deliveredMessages.size(); - additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - - if (!session.isTransacted()) { - deliveredMessages.clear(); + synchronized(deliveredMessages) { + if (deliveredMessages.isEmpty()) { + return; + } + + // Acknowledge the last message. + MessageDispatch lastMd = deliveredMessages.get(0); + MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); + if (session.isTransacted()) { + session.doStartTransaction(); + ack.setTransactionId(session.getTransactionContext().getTransactionId()); + } + session.asyncSendPacket(ack); + + // Adjust the counters + deliveredCounter -= deliveredMessages.size(); + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + + if (!session.isTransacted()) { + deliveredMessages.clear(); + } } } public void commit() throws JMSException { - deliveredMessages.clear(); + synchronized (deliveredMessages) { + deliveredMessages.clear(); + } redeliveryDelay = 0; } @@ -845,74 +855,78 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // remove messages read but not acked at the broker yet through // optimizeAcknowledge if (!this.info.isBrowser()) { - for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { - // ensure we don't filter this as a duplicate - MessageDispatch md = deliveredMessages.removeLast(); - session.connection.rollbackDuplicate(this, md.getMessage()); + synchronized(deliveredMessages) { + for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { + // ensure we don't filter this as a duplicate + MessageDispatch md = deliveredMessages.removeLast(); + session.connection.rollbackDuplicate(this, md.getMessage()); + } } } } - if (deliveredMessages.isEmpty()) { - return; - } - - // Only increase the redlivery delay after the first redelivery.. - MessageDispatch lastMd = deliveredMessages.getFirst(); - if (lastMd.getMessage().getRedeliveryCounter() > 0) { - redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); - } - - for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { - MessageDispatch md = (MessageDispatch)iter.next(); - md.getMessage().onMessageRolledBack(); - } - - if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES - && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { - // We need to NACK the messages so that they get sent to the - // DLQ. - // Acknowledge the last message. - - MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); - // ensure we don't filter this as a duplicate - session.connection.rollbackDuplicate(this, lastMd.getMessage()); - // Adjust the window size. - additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - redeliveryDelay = 0; - } else { - - MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); - - // stop the delivery of messages. - unconsumedMessages.stop(); - + synchronized(deliveredMessages) { + if (deliveredMessages.isEmpty()) { + return; + } + + // Only increase the redlivery delay after the first redelivery.. + MessageDispatch lastMd = deliveredMessages.getFirst(); + if (lastMd.getMessage().getRedeliveryCounter() > 0) { + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); + } + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { MessageDispatch md = (MessageDispatch)iter.next(); - unconsumedMessages.enqueueFirst(md); + md.getMessage().onMessageRolledBack(); } - - if (redeliveryDelay > 0) { - // Start up the delivery again a little later. - Scheduler.executeAfterDelay(new Runnable() { - public void run() { - try { - if (started.get()) { - start(); - } - } catch (JMSException e) { - session.connection.onAsyncException(e); - } - } - }, redeliveryDelay); + + if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { + // We need to NACK the messages so that they get sent to the + // DLQ. + // Acknowledge the last message. + + MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); + session.asyncSendPacket(ack); + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, lastMd.getMessage()); + // Adjust the window size. + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + redeliveryDelay = 0; } else { - start(); + + MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); + session.asyncSendPacket(ack); + + // stop the delivery of messages. + unconsumedMessages.stop(); + + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { + MessageDispatch md = (MessageDispatch)iter.next(); + unconsumedMessages.enqueueFirst(md); + } + + if (redeliveryDelay > 0) { + // Start up the delivery again a little later. + Scheduler.executeAfterDelay(new Runnable() { + public void run() { + try { + if (started.get()) { + start(); + } + } catch (JMSException e) { + session.connection.onAsyncException(e); + } + } + }, redeliveryDelay); + } else { + start(); + } + } - + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); } - deliveredCounter -= deliveredMessages.size(); - deliveredMessages.clear(); } if (messageListener != null) { session.redispatch(this, unconsumedMessages);