improved the patch because of some test failures
This commit is contained in:
Andy Taylor 2015-06-12 09:55:41 +01:00
parent 33fb7c6096
commit 151e25117b
1 changed files with 142 additions and 102 deletions

View File

@ -200,6 +200,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
private static final Object REDELIVERY_GUARD = new Object();
private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode;
@ -916,125 +917,164 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
/*
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (REDELIVERY_GUARD) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() {
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
if ( connection.isNonBlockingRedelivery() == false) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void run() {
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
} else {
LOG.debug("Session released, issuing re-delivery...");
executor.executeFirst(md);
executor.start();
}
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
}, redeliveryDelay);
/*
* If we are a non blocking delivery then we need to stop the executor to avoid more
* messages being delivered, once the message is redelivered we can restart it.
* */
if (!connection.isNonBlockingRedelivery()) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (REDELIVERY_GUARD) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
/*
* If there has been an error thrown during afterDelivery then the
* endpoint will be marked as dead so redelivery will fail (and eventually
* the session marked as stale), in this case we can only call dispatch
* which will create a new session with a new endpoint.
* */
if (afterDeliveryError.get()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
executor.executeFirst(md);
executor.start();
}
}
}
}
}, redeliveryDelay);
}
md.getMessage().onMessageRolledBack();
}
md.getMessage().onMessageRolledBack();
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
});
}
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
if (ack.getTransactionId() == null) {
if (deliveryListener != null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
deliveryListener.afterDelivery(this, message);
} catch (Throwable t) {
LOG.debug("Unable to call after delivery", t);
afterDeliveryError.set(true);
throw t;
}
}
}
if (deliveryListener != null) {
deliveryListener.afterDelivery(this, message);
}
/*
* this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
* It also needs to be outside the redelivery guard.
* */
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
}
}
}
@ -2118,7 +2158,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override
public String toString() {
return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex;
}
public void checkMessageListener() throws JMSException {