git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@613829 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-21 10:28:59 +00:00
parent 8f190f188b
commit caf7a7c7c2
1 changed files with 115 additions and 101 deletions

View File

@ -595,11 +595,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageAck ack = null; MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (deliveryingAcknowledgements.compareAndSet(false, true)) {
if (this.optimizeAcknowledge) { if (this.optimizeAcknowledge) {
if (!deliveredMessages.isEmpty()) { synchronized(deliveredMessages) {
MessageDispatch md = deliveredMessages.getFirst(); if (!deliveredMessages.isEmpty()) {
ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); MessageDispatch md = deliveredMessages.getFirst();
deliveredMessages.clear(); ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
ackCounter = 0; deliveredMessages.clear();
ackCounter = 0;
}
} }
} }
if (ack != null) { if (ack != null) {
@ -712,7 +714,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId()); md.setDeliverySequenceId(session.getNextDeliveryId());
if (!session.isDupsOkAcknowledge()) { if (!session.isDupsOkAcknowledge()) {
deliveredMessages.addFirst(md); synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.isTransacted()) { if (session.isTransacted()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} }
@ -730,24 +734,26 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (session.isTransacted()) { if (session.isTransacted()) {
// Do nothing. // Do nothing.
} else if (session.isAutoAcknowledge()) { } else if (session.isAutoAcknowledge()) {
if (!deliveredMessages.isEmpty()) { synchronized (deliveredMessages) {
if (optimizeAcknowledge) { if (!deliveredMessages.isEmpty()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (optimizeAcknowledge) {
ackCounter++; if (deliveryingAcknowledgements.compareAndSet(
if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) { false, true)) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, ackCounter++;
deliveredMessages.size()); if (ackCounter >= (info
session.asyncSendPacket(ack); .getCurrentPrefetchSize() * .65)) {
ackCounter = 0; MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
deliveredMessages.clear(); session.asyncSendPacket(ack);
ackCounter = 0;
deliveredMessages.clear();
}
deliveryingAcknowledgements.set(false);
} }
deliveryingAcknowledgements.set(false); } else {
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
} }
} else {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages
.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
} }
} }
} else if (session.isDupsOkAcknowledge()) { } else if (session.isDupsOkAcknowledge()) {
@ -812,30 +818,34 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @throws JMSException * @throws JMSException
*/ */
public void acknowledge() throws JMSException { public void acknowledge() throws JMSException {
if (deliveredMessages.isEmpty()) { synchronized(deliveredMessages) {
return; if (deliveredMessages.isEmpty()) {
} return;
}
// Acknowledge the last message.
MessageDispatch lastMd = deliveredMessages.get(0); // Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size()); MessageDispatch lastMd = deliveredMessages.get(0);
if (session.isTransacted()) { MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
session.doStartTransaction(); if (session.isTransacted()) {
ack.setTransactionId(session.getTransactionContext().getTransactionId()); session.doStartTransaction();
} ack.setTransactionId(session.getTransactionContext().getTransactionId());
session.asyncSendPacket(ack); }
session.asyncSendPacket(ack);
// Adjust the counters
deliveredCounter -= deliveredMessages.size(); // Adjust the counters
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); deliveredCounter -= deliveredMessages.size();
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
if (!session.isTransacted()) {
deliveredMessages.clear(); if (!session.isTransacted()) {
deliveredMessages.clear();
}
} }
} }
public void commit() throws JMSException { public void commit() throws JMSException {
deliveredMessages.clear(); synchronized (deliveredMessages) {
deliveredMessages.clear();
}
redeliveryDelay = 0; redeliveryDelay = 0;
} }
@ -845,74 +855,78 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// remove messages read but not acked at the broker yet through // remove messages read but not acked at the broker yet through
// optimizeAcknowledge // optimizeAcknowledge
if (!this.info.isBrowser()) { if (!this.info.isBrowser()) {
for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { synchronized(deliveredMessages) {
// ensure we don't filter this as a duplicate for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
MessageDispatch md = deliveredMessages.removeLast(); // ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, md.getMessage()); MessageDispatch md = deliveredMessages.removeLast();
session.connection.rollbackDuplicate(this, md.getMessage());
}
} }
} }
} }
if (deliveredMessages.isEmpty()) { synchronized(deliveredMessages) {
return; if (deliveredMessages.isEmpty()) {
} return;
}
// Only increase the redlivery delay after the first redelivery..
MessageDispatch lastMd = deliveredMessages.getFirst(); // Only increase the redlivery delay after the first redelivery..
if (lastMd.getMessage().getRedeliveryCounter() > 0) { MessageDispatch lastMd = deliveredMessages.getFirst();
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); if (lastMd.getMessage().getRedeliveryCounter() > 0) {
} redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next();
md.getMessage().onMessageRolledBack();
}
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack);
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, lastMd.getMessage());
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
redeliveryDelay = 0;
} else {
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack);
// stop the delivery of messages.
unconsumedMessages.stop();
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next(); MessageDispatch md = (MessageDispatch)iter.next();
unconsumedMessages.enqueueFirst(md); md.getMessage().onMessageRolledBack();
} }
if (redeliveryDelay > 0) { if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
// Start up the delivery again a little later. && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
Scheduler.executeAfterDelay(new Runnable() { // We need to NACK the messages so that they get sent to the
public void run() { // DLQ.
try { // Acknowledge the last message.
if (started.get()) {
start(); MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
} session.asyncSendPacket(ack);
} catch (JMSException e) { // ensure we don't filter this as a duplicate
session.connection.onAsyncException(e); session.connection.rollbackDuplicate(this, lastMd.getMessage());
} // Adjust the window size.
} additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
}, redeliveryDelay); redeliveryDelay = 0;
} else { } else {
start();
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack);
// stop the delivery of messages.
unconsumedMessages.stop();
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next();
unconsumedMessages.enqueueFirst(md);
}
if (redeliveryDelay > 0) {
// Start up the delivery again a little later.
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
try {
if (started.get()) {
start();
}
} catch (JMSException e) {
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
} else {
start();
}
} }
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
} }
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
} }
if (messageListener != null) { if (messageListener != null) {
session.redispatch(this, unconsumedMessages); session.redispatch(this, unconsumedMessages);