Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-11.0.x

This commit is contained in:
Lachlan Roberts 2021-08-31 10:22:03 +10:00
commit bbbd799673
23 changed files with 358 additions and 147 deletions

View File

@ -0,0 +1,99 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*/
public class ByteBufferCallbackAccumulator
{
private final List<Entry> _entries = new ArrayList<>();
private int _length;
private static class Entry
{
private final ByteBuffer buffer;
private final Callback callback;
Entry(ByteBuffer buffer, Callback callback)
{
this.buffer = buffer;
this.callback = callback;
}
}
public void addEntry(ByteBuffer buffer, Callback callback)
{
_entries.add(new Entry(buffer, callback));
_length = Math.addExact(_length, buffer.remaining());
}
/**
* @return the total length of the content in the accumulator.
*/
public int getLength()
{
return _length;
}
/**
* @return a newly allocated byte array containing all content written into the accumulator.
*/
public byte[] takeByteArray()
{
int length = getLength();
if (length == 0)
return new byte[0];
byte[] bytes = new byte[length];
ByteBuffer buffer = BufferUtil.toBuffer(bytes);
BufferUtil.clear(buffer);
writeTo(buffer);
return bytes;
}
public void writeTo(ByteBuffer buffer)
{
if (BufferUtil.space(buffer) < _length)
throw new IllegalArgumentException("not enough buffer space remaining");
int pos = BufferUtil.flipToFill(buffer);
for (Entry entry : _entries)
{
buffer.put(entry.buffer);
entry.callback.succeeded();
}
BufferUtil.flipToFlush(buffer, pos);
_entries.clear();
_length = 0;
}
public void fail(Throwable t)
{
for (Entry entry : _entries)
{
entry.callback.failed(t);
}
_entries.clear();
_length = 0;
}
}

View File

@ -152,7 +152,7 @@ public interface Callback extends Invocable
}
/**
* Creaste a callback that runs completed when it succeeds or fails
* Creates a callback that runs completed when it succeeds or fails
*
* @param completed The completion to run on success or failure
* @return a new callback
@ -169,7 +169,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback that runs completed after
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
@ -188,7 +188,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback that runs completed before
* Creates a nested callback that runs completed before
* completing the nested callback.
*
* @param callback The nested callback
@ -231,7 +231,7 @@ public interface Callback extends Invocable
}
/**
* Create a nested callback which always fails the nested callback on completion.
* Creates a nested callback which always fails the nested callback on completion.
*
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
@ -258,7 +258,7 @@ public interface Callback extends Invocable
}
/**
* Create a callback which combines two other callbacks and will succeed or fail them both.
* Creates a callback which combines two other callbacks and will succeed or fail them both.
* @param callback1 The first callback
* @param callback2 The second callback
* @return a new callback.
@ -411,6 +411,32 @@ public interface Callback extends Invocable
*/
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* Creates a completable future given a callback.
*
* @param callback The nested callback.
* @return a new Completable which will succeed this callback when completed.
*/
public static Completable from(Callback callback)
{
return new Completable(callback.getInvocationType())
{
@Override
public void succeeded()
{
callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
callback.failed(x);
super.failed(x);
}
};
}
private final InvocationType invocation;
public Completable()

View File

@ -110,6 +110,11 @@ public interface CoreSession extends OutgoingFrames, Configuration
*/
SocketAddress getRemoteAddress();
/**
* @return True if the websocket is open inbound
*/
boolean isInputOpen();
/**
* @return True if the websocket is open outbound
*/
@ -253,6 +258,12 @@ public interface CoreSession extends OutgoingFrames, Configuration
return null;
}
@Override
public boolean isInputOpen()
{
return true;
}
@Override
public boolean isOutputOpen()
{

View File

@ -183,6 +183,12 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
return getConnection().getEndPoint().getRemoteSocketAddress();
}
@Override
public boolean isInputOpen()
{
return sessionState.isInputOpen();
}
@Override
public boolean isOutputOpen()
{
@ -416,8 +422,10 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
{
if (!demanding)
throw new IllegalStateException("FrameHandler is not demanding: " + this);
if (!sessionState.isInputOpen())
throw new IllegalStateException("FrameHandler input not open: " + this);
connection.demand(n);
}

View File

@ -13,12 +13,11 @@
package org.eclipse.jetty.websocket.core.internal.messages;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -29,9 +28,7 @@ import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
public class ByteArrayMessageSink extends AbstractMessageSink
{
private static final byte[] EMPTY_BUFFER = new byte[0];
private static final int BUFFER_SIZE = 65535;
private ByteArrayOutputStream out;
private int size;
private ByteBufferCallbackAccumulator out;
public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
@ -51,12 +48,12 @@ public class ByteArrayMessageSink extends AbstractMessageSink
{
try
{
size += frame.getPayloadLength();
long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d",
size, maxBinaryMessageSize));
throw new MessageTooLargeException(
String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize));
}
// If we are fin and no OutputStream has been created we don't need to aggregate.
@ -71,19 +68,33 @@ public class ByteArrayMessageSink extends AbstractMessageSink
methodHandle.invoke(EMPTY_BUFFER, 0, 0);
callback.succeeded();
session.demand(1);
return;
}
aggregatePayload(frame);
// Aggregate the frame payload.
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}
// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
byte[] buf = out.toByteArray();
byte[] buf = out.takeByteArray();
methodHandle.invoke(buf, 0, buf.length);
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
callback.failed(t);
}
finally
@ -92,19 +103,7 @@ public class ByteArrayMessageSink extends AbstractMessageSink
{
// reset
out = null;
size = 0;
}
}
}
private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
}
}
}

View File

@ -13,13 +13,13 @@
package org.eclipse.jetty.websocket.core.internal.messages;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
@ -29,9 +29,7 @@ import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
public class ByteBufferMessageSink extends AbstractMessageSink
{
private static final int BUFFER_SIZE = 65535;
private ByteArrayOutputStream out;
private int size;
private ByteBufferCallbackAccumulator out;
public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle)
{
@ -51,7 +49,7 @@ public class ByteBufferMessageSink extends AbstractMessageSink
{
try
{
size += frame.getPayloadLength();
long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength();
long maxBinaryMessageSize = session.getMaxBinaryMessageSize();
if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize)
{
@ -68,41 +66,51 @@ public class ByteBufferMessageSink extends AbstractMessageSink
methodHandle.invoke(BufferUtil.EMPTY_BUFFER);
callback.succeeded();
session.demand(1);
return;
}
aggregatePayload(frame);
if (frame.isFin())
methodHandle.invoke(ByteBuffer.wrap(out.toByteArray()));
// Aggregate the frame payload.
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}
callback.succeeded();
// If the methodHandle throws we don't want to fail callback twice.
callback = Callback.NOOP;
if (frame.isFin())
{
ByteBufferPool bufferPool = session.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(out.getLength(), false);
out.writeTo(buffer);
try
{
methodHandle.invoke(buffer);
}
finally
{
bufferPool.release(buffer);
}
}
session.demand(1);
}
catch (Throwable t)
{
if (out != null)
out.fail(t);
callback.failed(t);
}
finally
{
if (frame.isFin())
{
// reset
out = null;
size = 0;
}
}
}
private void aggregatePayload(Frame frame) throws IOException
{
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new ByteArrayOutputStream(BUFFER_SIZE);
BufferUtil.writeTo(payload, out);
payload.position(payload.limit()); // consume buffer
}
}
}

View File

@ -135,22 +135,32 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
});
}
Callback frameCallback = callback;
Callback frameCallback;
if (frame.isFin())
{
// This is the final frame we should wait for the frame callback and the dispatched thread.
Callback.Completable completableCallback = new Callback.Completable();
frameCallback = completableCallback;
CompletableFuture.allOf(dispatchComplete, completableCallback).whenComplete((aVoid, throwable) ->
Callback.Completable finComplete = Callback.Completable.from(callback);
frameCallback = finComplete;
CompletableFuture.allOf(dispatchComplete, finComplete).whenComplete((aVoid, throwable) ->
{
typeSink = null;
dispatchComplete = null;
if (throwable != null)
callback.failed(throwable);
else
callback.succeeded();
if (throwable == null)
session.demand(1);
});
}
else
{
frameCallback = new Callback.Nested(callback)
{
@Override
public void succeeded()
{
super.succeeded();
session.demand(1);
}
};
}
typeSink.accept(frame, frameCallback);
}

View File

@ -41,6 +41,7 @@ public class PartialByteArrayMessageSink extends AbstractMessageSink
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{

View File

@ -35,6 +35,7 @@ public class PartialByteBufferMessageSink extends AbstractMessageSink
methodHandle.invoke(frame.getPayload(), frame.isFin());
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{

View File

@ -51,6 +51,7 @@ public class PartialStringMessageSink extends AbstractMessageSink
}
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{

View File

@ -53,6 +53,7 @@ public class StringMessageSink extends AbstractMessageSink
methodHandle.invoke(out.toString());
callback.succeeded();
session.demand(1);
}
catch (Throwable t)
{

View File

@ -178,6 +178,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onJakartaWebSocketSessionOpened(session));
callback.succeeded();
coreSession.demand(1);
}
catch (Throwable cause)
{
@ -321,6 +322,12 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
}
}
@Override
public boolean isDemanding()
{
return true;
}
public Set<MessageHandler> getMessageHandlers()
{
return messageHandlerMap.values().stream()
@ -591,6 +598,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
ByteBuffer payload = BufferUtil.copy(frame.getPayload());
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(payload), Callback.NOOP, false);
callback.succeeded();
coreSession.demand(1);
}
public void onPong(Frame frame, Callback callback)
@ -613,6 +621,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
}
}
callback.succeeded();
coreSession.demand(1);
}
public void onText(Frame frame, Callback callback)

View File

@ -13,38 +13,35 @@
package org.eclipse.jetty.websocket.jakarta.common;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class AbstractSessionTest
{
protected static JakartaWebSocketSession session;
protected static JakartaWebSocketContainer container;
protected static WebSocketComponents components;
protected static JakartaWebSocketContainer container = new DummyContainer();
protected static WebSocketComponents components = new WebSocketComponents();
protected static TestCoreSession coreSession = new TestCoreSession();
@BeforeAll
public static void initSession() throws Exception
{
container = new DummyContainer();
container.start();
components = new WebSocketComponents();
components.start();
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
JakartaWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
CoreSession coreSession = new CoreSession.Empty()
{
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
};
session = new JakartaWebSocketSession(container, coreSession, frameHandler, container.getFrameHandlerFactory()
.newDefaultEndpointConfig(websocketPojo.getClass()));
}
@ -56,6 +53,34 @@ public abstract class AbstractSessionTest
container.stop();
}
public static class TestCoreSession extends CoreSession.Empty
{
private final Semaphore demand = new Semaphore(0);
@Override
public WebSocketComponents getWebSocketComponents()
{
return components;
}
@Override
public ByteBufferPool getByteBufferPool()
{
return components.getBufferPool();
}
public void waitForDemand(long timeout, TimeUnit timeUnit) throws InterruptedException
{
assertTrue(demand.tryAcquire(timeout, timeUnit));
}
@Override
public void demand(long n)
{
demand.release();
}
}
public static class DummyEndpoint extends Endpoint
{
@Override

View File

@ -20,15 +20,12 @@ import java.util.function.Consumer;
import jakarta.websocket.ClientEndpointConfig;
import jakarta.websocket.Decoder;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.jakarta.common.AbstractSessionTest;
import org.eclipse.jetty.websocket.jakarta.common.JakartaWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.jakarta.common.decoders.RegisteredDecoder;
public abstract class AbstractMessageSinkTest extends AbstractSessionTest
{
private final WebSocketComponents _components = new WebSocketComponents();
public List<RegisteredDecoder> toRegisteredDecoderList(Class<? extends Decoder> clazz, Class<?> objectType)
{
Class<? extends Decoder> interfaceType;
@ -43,7 +40,7 @@ public abstract class AbstractMessageSinkTest extends AbstractSessionTest
else
throw new IllegalStateException();
return List.of(new RegisteredDecoder(clazz, interfaceType, objectType, ClientEndpointConfig.Builder.create().build(), _components));
return List.of(new RegisteredDecoder(clazz, interfaceType, objectType, ClientEndpointConfig.Builder.create().build(), components));
}
public <T> MethodHandle getAcceptHandle(Consumer<T> copy, Class<T> type)

View File

@ -56,11 +56,11 @@ public class DecodedBinaryMessageSinkTest extends AbstractMessageSinkTest
data.put((byte)31);
data.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -89,16 +89,18 @@ public class DecodedBinaryMessageSinkTest extends AbstractMessageSinkTest
data3.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
}
private String format(Calendar cal, String formatPattern)

View File

@ -58,11 +58,11 @@ public class DecodedBinaryStreamMessageSinkTest extends AbstractMessageSinkTest
data.put((byte)31);
data.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -91,16 +91,17 @@ public class DecodedBinaryStreamMessageSinkTest extends AbstractMessageSinkTest
data3.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
}
private String format(Calendar cal, String formatPattern)

View File

@ -51,11 +51,11 @@ public class DecodedTextMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -72,16 +72,17 @@ public class DecodedTextMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
}
private String format(Date date, String formatPattern)

View File

@ -54,11 +54,11 @@ public class DecodedTextStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -75,16 +75,17 @@ public class DecodedTextStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
}
private String format(Date date, String formatPattern)

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -51,10 +52,11 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
ByteBuffer data = BufferUtil.toBuffer("Hello World", UTF_8);
sink.accept(new Frame(OpCode.BINARY).setPayload(data), finCallback);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS);
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello World"));
}
@Test
@ -68,19 +70,22 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
ByteBuffer data1 = BufferUtil.toBuffer("Hello World", UTF_8);
sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(true), fin1Callback);
fin1Callback.get(1, TimeUnit.SECONDS); // wait for callback (can't sent next message until this callback finishes)
// wait for demand (can't sent next message until a new frame is demanded)
coreSession.waitForDemand(1, TimeUnit.SECONDS);
fin1Callback.get(1, TimeUnit.SECONDS);
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World"));
assertThat("FinCallback.done", fin1Callback.isDone(), is(true));
assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello World"));
FutureCallback fin2Callback = new FutureCallback();
ByteBuffer data2 = BufferUtil.toBuffer("Greetings Earthling", UTF_8);
sink.accept(new Frame(OpCode.BINARY).setPayload(data2).setFin(true), fin2Callback);
fin2Callback.get(1, TimeUnit.SECONDS); // wait for callback
coreSession.waitForDemand(1, TimeUnit.SECONDS);
fin2Callback.get(1, TimeUnit.SECONDS);
byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings Earthling"));
assertThat("FinCallback.done", fin2Callback.isDone(), is(true));
assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Greetings Earthling"));
}
@Test
@ -95,16 +100,17 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.BINARY).setPayload("Hello").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello, World"));
assertThat("callback1.done", callback1.isDone(), is(true));
assertThat("callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Hello, World"));
}
@Test
@ -120,18 +126,20 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.BINARY).setPayload("Greetings").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("Earthling").setFin(false), callback3);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(new byte[0]).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(5, TimeUnit.SECONDS); // wait for callback
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings, Earthling"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("Callback3.done", callback3.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", new String(byteStream.toByteArray(), UTF_8), is("Greetings, Earthling"));
}
public static class InputStreamCopy implements Consumer<InputStream>
@ -156,9 +164,9 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
}
}
public ByteArrayOutputStream poll(long time, TimeUnit unit) throws InterruptedException, ExecutionException
public ByteArrayOutputStream poll(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return streams.poll(time, unit).get();
return Objects.requireNonNull(streams.poll(time, unit)).get(time, unit);
}
}
}

View File

@ -45,11 +45,11 @@ public class ReaderMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("Hello World"), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -65,15 +65,17 @@ public class ReaderMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for fin callback
StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World"));
}
public static class ReaderCopy implements Consumer<Reader>

View File

@ -52,5 +52,10 @@
<artifactId>websocket-core-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -82,6 +82,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private WebSocketSession session;
private SuspendState state = SuspendState.DEMANDING;
private Runnable delayedOnFrame;
private CoreSession coreSession;
public JettyWebSocketFrameHandler(WebSocketContainer container,
Object endpointInstance,
@ -150,6 +151,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
try
{
customizer.customize(coreSession);
this.coreSession = coreSession;
session = new WebSocketSession(container, coreSession, this);
if (!session.isOpen())
throw new IllegalStateException("Session is not open");
@ -223,43 +225,25 @@ public class JettyWebSocketFrameHandler implements FrameHandler
}
}
// Demand after succeeding any received frame
Callback demandingCallback = Callback.from(() ->
{
try
{
demand();
}
catch (Throwable t)
{
callback.failed(t);
return;
}
callback.succeeded();
},
callback::failed
);
switch (frame.getOpCode())
{
case OpCode.CLOSE:
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPingFrame(frame, demandingCallback);
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPongFrame(frame, demandingCallback);
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
onTextFrame(frame, demandingCallback);
onTextFrame(frame, callback);
break;
case OpCode.BINARY:
onBinaryFrame(frame, demandingCallback);
onBinaryFrame(frame, callback);
break;
case OpCode.CONTINUATION:
onContinuationFrame(frame, demandingCallback);
onContinuationFrame(frame, callback);
break;
default:
callback.failed(new IllegalStateException());
@ -342,6 +326,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
if (activeMessageSink == null)
{
callback.succeeded();
demand();
return;
}
@ -387,7 +372,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
ByteBuffer payload = BufferUtil.copy(frame.getPayload());
getSession().getRemote().sendPong(payload, WriteCallback.NOOP);
}
callback.succeeded();
demand();
}
private void onPongFrame(Frame frame, Callback callback)
@ -407,7 +394,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
throw new WebSocketException(endpointInstance.getClass().getSimpleName() + " PONG method error: " + cause.getMessage(), cause);
}
}
callback.succeeded();
demand();
}
private void onTextFrame(Frame frame, Callback callback)

View File

@ -20,6 +20,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
@ -40,6 +42,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
public BlockingQueue<ByteBuffer> binaryMessages = new LinkedBlockingDeque<>();
public BlockingQueue<String> events = new LinkedBlockingDeque<>();
private final ByteBufferPool bufferPool = new NullByteBufferPool();
private final MethodHandle wholeTextHandle;
private final MethodHandle wholeBinaryHandle;
private MessageSink messageSink;
@ -116,16 +119,19 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
if (OpCode.isDataFrame(frame.getOpCode()))
{
messageSink.accept(Frame.copy(frame), callback);
Frame copy = Frame.copy(frame);
messageSink.accept(copy, Callback.from(() -> {}, Throwable::printStackTrace));
if (frame.isFin())
{
messageSink = null;
}
}
else
{
callback.succeeded();
}
callback.succeeded();
}
@Override
public ByteBufferPool getByteBufferPool()
{
return bufferPool;
}
@SuppressWarnings("unused")