ARTEMIS-2333 Applying proper fix on Stomp delivery

When connection is dead, the StompSession may deliver a message and if AUTO-ACK it would ack and lose the message
This commit is contained in:
Clebert Suconic 2019-05-08 18:16:22 -04:00
parent ee674ba63f
commit 408cd3745c
5 changed files with 19 additions and 2 deletions

View File

@ -137,10 +137,15 @@ public class NettyConnection implements Connection {
readyListeners.add(callback); readyListeners.add(callback);
} }
return ready && channel.isOpen(); return ready;
} }
} }
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override @Override
public final void fireReady(final boolean ready) { public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get(); ArrayList<ReadyListener> readyToCall = localListenersPool.get();

View File

@ -43,6 +43,8 @@ public interface Connection {
boolean isWritable(ReadyListener listener); boolean isWritable(ReadyListener listener);
boolean isOpen();
/** /**
* Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses. * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control

View File

@ -343,6 +343,11 @@ public class ChannelImplTest {
} }
@Override
public boolean isOpen() {
return true;
}
@Override @Override
public boolean isWritable(ReadyListener listener) { public boolean isWritable(ReadyListener listener) {
return false; return false;

View File

@ -153,7 +153,7 @@ public final class StompConnection implements RemotingConnection {
@Override @Override
public boolean isWritable(ReadyListener callback) { public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback); return transportConnection.isWritable(callback) && transportConnection.isOpen();
} }
public boolean hasBytes() { public boolean hasBytes() {

View File

@ -116,6 +116,11 @@ public class InVMConnection implements Connection {
return true; return true;
} }
@Override
public boolean isOpen() {
return true;
}
@Override @Override
public void fireReady(boolean ready) { public void fireReady(boolean ready) {
} }