Issue #6642 - never shutdown output after generating a request.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-08-26 14:18:28 +10:00
parent fd6c72c668
commit 83f2265653
6 changed files with 33 additions and 32 deletions

View File

@ -21,7 +21,11 @@ import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class BufferCallbackAccumulator
/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*/
public class ByteBufferCallbackAccumulator
{
private final List<Entry> _entries = new ArrayList<>();
private int _length;
@ -45,8 +49,6 @@ public class BufferCallbackAccumulator
}
/**
* Get the amount of bytes which have been accumulated.
* This will add up the remaining of each buffer in the accumulator.
* @return the total length of the content in the accumulator.
*/
public int getLength()

View File

@ -108,7 +108,7 @@ public interface Callback extends Invocable
}
/**
* Create a callback from the passed success and failure
* Creates a callback from the passed success and failure
*
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
@ -133,7 +133,7 @@ public interface Callback extends Invocable
}
/**
* Create a callback that runs completed when it succeeds or fails
* Creates a callback that runs completed when it succeeds or fails
*
* @param completed The completion to run on success or failure
* @return a new callback
@ -150,7 +150,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback that runs completed after
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
@ -169,7 +169,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback that runs completed before
* Creates a nested callback that runs completed before
* completing the nested callback.
*
* @param callback The nested callback
@ -212,7 +212,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback which always fails the nested callback on completion.
* Creates a nested callback which always fails the nested callback on completion.
*
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
@ -239,7 +239,7 @@ public interface Callback extends Invocable
}
/**
* Create a callback which combines two other callbacks and will succeed or fail them both.
* Creates a callback which combines two other callbacks and will succeed or fail them both.
* @param callback1 The first callback
* @param callback2 The second callback
* @return a new callback.
@ -397,14 +397,14 @@ public interface Callback extends Invocable
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* Create a completable future given a callback.
* Creates a completable future given a callback.
*
* @param callback The nested callback.
* @return a new Completable which will succeed this callback when completed.
*/
public static Completable from(Callback callback)
{
return new Completable()
return new Completable(callback.getInvocationType())
{
@Override
public void succeeded()

View File

@ -17,7 +17,7 @@ import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.BufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
public class ByteArrayMessageSink extends AbstractMessageSink
{
private static final byte[] EMPTY_BUFFER = new byte[0];
private BufferCallbackAccumulator out;
private ByteBufferCallbackAccumulator out;
public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
@ -77,7 +77,7 @@ public class ByteArrayMessageSink extends AbstractMessageSink
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new BufferCallbackAccumulator();
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

View File

@ -18,7 +18,7 @@ import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.eclipse.jetty.io.BufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -29,7 +29,7 @@ import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
public class ByteBufferMessageSink extends AbstractMessageSink
{
private BufferCallbackAccumulator out;
private ByteBufferCallbackAccumulator out;
public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle)
{
@ -75,7 +75,7 @@ public class ByteBufferMessageSink extends AbstractMessageSink
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new BufferCallbackAccumulator();
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

View File

@ -151,11 +151,15 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
}
else
{
frameCallback = Callback.from(() ->
frameCallback = new Callback.Nested(callback)
{
callback.succeeded();
session.demand(1);
}, callback::failed);
@Override
public void succeeded()
{
session.demand(1);
super.succeeded();
}
};
}
typeSink.accept(frame, frameCallback);

View File

@ -225,25 +225,16 @@ public class JettyWebSocketFrameHandler implements FrameHandler
}
}
// Demand after succeeding any received frame
Callback demandingCallback = Callback.from(() ->
{
demand();
callback.succeeded();
},
callback::failed
);
switch (frame.getOpCode())
{
case OpCode.CLOSE:
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPingFrame(frame, demandingCallback);
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPongFrame(frame, demandingCallback);
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
onTextFrame(frame, callback);
@ -381,7 +372,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
ByteBuffer payload = BufferUtil.copy(frame.getPayload());
getSession().getRemote().sendPong(payload, WriteCallback.NOOP);
}
callback.succeeded();
demand();
}
private void onPongFrame(Frame frame, Callback callback)
@ -401,7 +394,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
throw new WebSocketException(endpointInstance.getClass().getSimpleName() + " PONG method error: " + cause.getMessage(), cause);
}
}
callback.succeeded();
demand();
}
private void onTextFrame(Frame frame, Callback callback)