From 56d68f0b723ccbca8ec4ffbc26a9c1d0be9c0002 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 22 Jan 2018 16:06:48 +0100 Subject: [PATCH] ARTEMIS-1607 OpenWire is sending responses too early with durable messages AMQSession is sending response back without waiting past I/O tasks on StorageManager to be finished --- .../protocol/openwire/amq/AMQSession.java | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index d6fe39025f..f4de1af867 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import javax.jms.InvalidDestinationException; import javax.jms.ResourceAllocationException; +import java.io.IOException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,6 +28,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; @@ -428,26 +430,42 @@ public class AMQSession implements SessionCallback { this.connection.getContext().setDontSendReponse(false); connection.sendException(exceptionToSend); } else { - if (sendProducerAck) { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchAsync(ack); - } catch (Exception e) { - this.connection.getContext().setDontSendReponse(false); - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); + server.getStorageManager().afterCompleteOperations(new IOCallback() { + @Override + public void done() { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { + connection.getContext().setDontSendReponse(false); + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } else { + connection.getContext().setDontSendReponse(false); + try { + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } } - } else { - connection.getContext().setDontSendReponse(false); - try { - Response response = new Response(); - response.setCorrelationId(messageSend.getCommandId()); - connection.dispatchAsync(response); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); + + @Override + public void onError(int errorCode, String errorMessage) { + try { + final IOException e = new IOException(errorMessage); + ActiveMQServerLogger.LOGGER.warn(errorMessage); + connection.serviceException(e); + } catch (Exception ex) { + ActiveMQServerLogger.LOGGER.debug(ex); + } } - } + }); } } })) {