pauses the delivery of messages to the activemq session when a rollback is happening until the message has been redelivered.

patch applied with thanks to Tamas Cserveny
This commit is contained in:
Andy Taylor 2015-06-08 15:51:44 +01:00
parent af999fe2b2
commit c9a3202bc3
2 changed files with 32 additions and 2 deletions

View File

@ -717,7 +717,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (!closed) { if (!closed) {
try { try {
executor.stop(); executor.close();
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer consumer = iter.next(); ActiveMQMessageConsumer consumer = iter.next();
@ -978,11 +978,24 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
for (int i = 0; i < redeliveryCounter; i++) { for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
} }
if ( connection.isNonBlockingRedelivery() == false) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() { connection.getScheduler().executeAfterDelay(new Runnable() {
@Override @Override
public void run() { public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
} else {
LOG.debug("Session released, issuing re-delivery...");
executor.executeFirst(md);
executor.start();
}
} }
}, redeliveryDelay); }, redeliveryDelay);
} }
@ -1016,6 +1029,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (deliveryListener != null) { if (deliveryListener != null) {
deliveryListener.afterDelivery(this, message); deliveryListener.afterDelivery(this, message);
} }
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
} }
} }

View File

@ -207,4 +207,15 @@ public class ActiveMQSessionExecutor implements Task {
List<MessageDispatch> getUnconsumedMessages() { List<MessageDispatch> getUnconsumedMessages() {
return messageQueue.removeAll(); return messageQueue.removeAll();
} }
void waitForQueueRestart() throws InterruptedException {
synchronized (messageQueue.getMutex()) {
while (messageQueue.isRunning() == false) {
if (messageQueue.isClosed()) {
break;
}
messageQueue.getMutex().wait();
}
}
}
} }