Issue #4824 - add configuration on RemoteEndpoint for maxOutgoingFrames

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-04-30 09:15:49 +10:00
parent ac97bc3b31
commit 71df3b57ee
2 changed files with 77 additions and 0 deletions

View File

@ -141,6 +141,24 @@ public interface RemoteEndpoint
*/
void setBatchMode(BatchMode mode);
/**
* Set the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation.
*
* @param maxOutgoingFrames the max number of frames.
*/
void setMaxOutgoingFrames(int maxOutgoingFrames);
/**
* Get the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation.
*
* @return the max number of frames.
*/
int getMaxOutgoingFrames();
/**
* Get the InetSocketAddress for the established connection.
*

View File

@ -81,7 +81,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private final OutgoingFrames outgoing;
private final AtomicInteger msgState = new AtomicInteger();
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
private final AtomicInteger numOutgoingFrames = new AtomicInteger();
private volatile BatchMode batchMode;
private int maxNumOutgoingFrames = -1;
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
@ -303,6 +305,19 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
BatchMode batchMode = BatchMode.OFF;
if (frame.isDataFrame())
batchMode = getBatchMode();
if (maxNumOutgoingFrames > 0 && frame.isDataFrame())
{
// Increase the number of outgoing frames, will be decremented when callback is completed.
int outgoingFrames = numOutgoingFrames.incrementAndGet();
callback = from(callback, numOutgoingFrames::decrementAndGet);
if (outgoingFrames > maxNumOutgoingFrames)
{
callback.writeFailed(new IOException("Exceeded max outgoing frames: " + outgoingFrames + ">" + maxNumOutgoingFrames));
return;
}
}
outgoing.outgoingFrame(frame, callback, batchMode);
}
@ -439,6 +454,18 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
this.batchMode = batchMode;
}
@Override
public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
this.maxNumOutgoingFrames = maxOutgoingFrames;
}
@Override
public int getMaxOutgoingFrames()
{
return maxNumOutgoingFrames;
}
@Override
public void flush() throws IOException
{
@ -459,4 +486,36 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode());
}
private static WriteCallback from(WriteCallback callback, Runnable completed)
{
return new WriteCallback()
{
@Override
public void writeFailed(Throwable x)
{
try
{
callback.writeFailed(x);
}
finally
{
completed.run();
}
}
@Override
public void writeSuccess()
{
try
{
callback.writeSuccess();
}
finally
{
completed.run();
}
}
};
}
}