This commit is contained in:
Clebert Suconic 2019-05-08 18:26:27 -04:00
commit 3f08e7f359
5 changed files with 19 additions and 2 deletions

View File

@ -137,10 +137,15 @@ public class NettyConnection implements Connection {
readyListeners.add(callback);
}
return ready && channel.isOpen();
return ready;
}
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get();

View File

@ -43,6 +43,8 @@ public interface Connection {
boolean isWritable(ReadyListener listener);
boolean isOpen();
/**
* Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control

View File

@ -343,6 +343,11 @@ public class ChannelImplTest {
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isWritable(ReadyListener listener) {
return false;

View File

@ -153,7 +153,7 @@ public final class StompConnection implements RemotingConnection {
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
return transportConnection.isWritable(callback) && transportConnection.isOpen();
}
public boolean hasBytes() {

View File

@ -116,6 +116,11 @@ public class InVMConnection implements Connection {
return true;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public void fireReady(boolean ready) {
}