From 408cd3745c633d6b54bfcafe5373251b183696aa Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 8 May 2019 18:16:22 -0400 Subject: [PATCH] ARTEMIS-2333 Applying proper fix on Stomp delivery When connection is dead, the StompSession may deliver a message and if AUTO-ACK it would ack and lose the message --- .../artemis/core/remoting/impl/netty/NettyConnection.java | 7 ++++++- .../activemq/artemis/spi/core/remoting/Connection.java | 2 ++ .../artemis/core/protocol/core/impl/ChannelImplTest.java | 5 +++++ .../artemis/core/protocol/stomp/StompConnection.java | 2 +- .../artemis/core/remoting/impl/invm/InVMConnection.java | 5 +++++ 5 files changed, 19 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 8029931791..51330c727b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -137,10 +137,15 @@ public class NettyConnection implements Connection { readyListeners.add(callback); } - return ready && channel.isOpen(); + return ready; } } + @Override + public boolean isOpen() { + return channel.isOpen(); + } + @Override public final void fireReady(final boolean ready) { ArrayList readyToCall = localListenersPool.get(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 0683a50224..ebde456034 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -43,6 +43,8 @@ public interface Connection { boolean isWritable(ReadyListener listener); + boolean isOpen(); + /** * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses. * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 3f6bdcaa8c..9908d974d2 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -343,6 +343,11 @@ public class ChannelImplTest { } + @Override + public boolean isOpen() { + return true; + } + @Override public boolean isWritable(ReadyListener listener) { return false; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 92b6edc46f..8c32281041 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -153,7 +153,7 @@ public final class StompConnection implements RemotingConnection { @Override public boolean isWritable(ReadyListener callback) { - return transportConnection.isWritable(callback); + return transportConnection.isWritable(callback) && transportConnection.isOpen(); } public boolean hasBytes() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index ba4b9970e4..b2fc576db3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -116,6 +116,11 @@ public class InVMConnection implements Connection { return true; } + @Override + public boolean isOpen() { + return true; + } + @Override public void fireReady(boolean ready) { }