From c9a3202bc3b526bdf65f239dad68f555d7b83df1 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Mon, 8 Jun 2015 15:51:44 +0100 Subject: [PATCH] https://issues.jboss.org/browse/ENTMQ-780 pauses the delivery of messages to the activemq session when a rollback is happening until the message has been redelivered. patch applied with thanks to Tamas Cserveny --- .../org/apache/activemq/ActiveMQSession.java | 23 +++++++++++++++++-- .../activemq/ActiveMQSessionExecutor.java | 11 +++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 1d2ae836fb..2e0f6468fa 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -717,7 +717,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if (!closed) { try { - executor.stop(); + executor.close(); for (Iterator iter = consumers.iterator(); iter.hasNext();) { ActiveMQMessageConsumer consumer = iter.next(); @@ -978,11 +978,24 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta for (int i = 0; i < redeliveryCounter; i++) { redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); } + + if ( connection.isNonBlockingRedelivery() == false) { + LOG.debug("Blocking session until re-delivery..."); + executor.stop(); + } + connection.getScheduler().executeAfterDelay(new Runnable() { @Override public void run() { - ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); + + if (connection.isNonBlockingRedelivery()) { + ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); + } else { + LOG.debug("Session released, issuing re-delivery..."); + executor.executeFirst(md); + executor.start(); + } } }, redeliveryDelay); } @@ -1016,6 +1029,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if (deliveryListener != null) { deliveryListener.afterDelivery(this, message); } + + try { + executor.waitForQueueRestart(); + } catch (InterruptedException ex) { + connection.onClientInternalException(ex); + } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index caa1ca9884..357815561c 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -207,4 +207,15 @@ public class ActiveMQSessionExecutor implements Task { List getUnconsumedMessages() { return messageQueue.removeAll(); } + + void waitForQueueRestart() throws InterruptedException { + synchronized (messageQueue.getMutex()) { + while (messageQueue.isRunning() == false) { + if (messageQueue.isClosed()) { + break; + } + messageQueue.getMutex().wait(); + } + } + } }