Fix some more unstable tests.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-08-19 20:43:13 +10:00
parent 026261f482
commit bd396f867b
9 changed files with 47 additions and 40 deletions

View File

@ -75,7 +75,7 @@ public class BufferCallbackAccumulator
for (Iterator<Entry> iterator = _entries.iterator(); iterator.hasNext();)
{
Entry entry = iterator.next();
_length = entry.buffer.remaining();
_length -= entry.buffer.remaining();
buffer.put(entry.buffer);
iterator.remove();
entry.callback.succeeded();

View File

@ -56,11 +56,11 @@ public class DecodedBinaryMessageSinkTest extends AbstractMessageSinkTest
data.put((byte)31);
data.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -89,16 +89,18 @@ public class DecodedBinaryMessageSinkTest extends AbstractMessageSinkTest
data3.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
}
private String format(Calendar cal, String formatPattern)

View File

@ -58,11 +58,11 @@ public class DecodedBinaryStreamMessageSinkTest extends AbstractMessageSinkTest
data.put((byte)31);
data.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("12-31-1999"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -91,16 +91,17 @@ public class DecodedBinaryStreamMessageSinkTest extends AbstractMessageSinkTest
data3.flip();
sink.accept(new Frame(OpCode.BINARY).setPayload(data1).setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data2).setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(data3).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Calendar decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("01-01-2000"));
}
private String format(Calendar cal, String formatPattern)

View File

@ -51,11 +51,11 @@ public class DecodedTextMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -72,16 +72,17 @@ public class DecodedTextMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
}
private String format(Date date, String formatPattern)

View File

@ -54,11 +54,11 @@ public class DecodedTextStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2018.02.13").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("02-13-2018"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -75,16 +75,17 @@ public class DecodedTextStreamMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("2023").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".08").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(".22").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
Date decoded = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Decoded.contents", format(decoded, "MM-dd-yyyy"), is("08-22-2023"));
}
private String format(Date date, String formatPattern)

View File

@ -55,8 +55,8 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS);
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -74,8 +74,8 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
coreSession.waitForDemand(1, TimeUnit.SECONDS);
fin1Callback.get(1, TimeUnit.SECONDS);
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", fin1Callback.isDone(), is(true));
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello World"));
assertThat("FinCallback.done", fin1Callback.isDone(), is(true));
FutureCallback fin2Callback = new FutureCallback();
ByteBuffer data2 = BufferUtil.toBuffer("Greetings Earthling", UTF_8);
@ -84,8 +84,8 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
coreSession.waitForDemand(1, TimeUnit.SECONDS);
fin2Callback.get(1, TimeUnit.SECONDS);
byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", fin2Callback.isDone(), is(true));
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings Earthling"));
assertThat("FinCallback.done", fin2Callback.isDone(), is(true));
}
@Test
@ -101,18 +101,16 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
sink.accept(new Frame(OpCode.BINARY).setPayload("Hello").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("callback1.done", callback1.isDone(), is(true));
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("callback2.done", callback2.isDone(), is(true));
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("finCallback.done", finCallback.isDone(), is(true));
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Hello, World"));
assertThat("callback1.done", callback1.isDone(), is(true));
assertThat("callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -129,22 +127,19 @@ public class InputStreamMessageSinkTest extends AbstractMessageSinkTest
sink.accept(new Frame(OpCode.BINARY).setPayload("Greetings").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("Callback1.done", callback1.isDone(), is(true));
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("Callback2.done", callback2.isDone(), is(true));
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("Earthling").setFin(false), callback3);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("Callback3.done", callback3.isDone(), is(true));
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(new byte[0]).setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
assertThat("finCallback.done", finCallback.isDone(), is(true));
ByteArrayOutputStream byteStream = copy.poll(1, TimeUnit.SECONDS);
assertThat("Writer.contents", byteStream.toString(UTF_8), is("Greetings, Earthling"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("Callback3.done", callback3.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
}
public static class InputStreamCopy implements Consumer<InputStream>

View File

@ -45,11 +45,11 @@ public class ReaderMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("Hello World"), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for callback
StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World"));
assertThat("FinCallback.done", finCallback.isDone(), is(true));
}
@Test
@ -65,15 +65,17 @@ public class ReaderMessageSinkTest extends AbstractMessageSinkTest
FutureCallback finCallback = new FutureCallback();
sink.accept(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), callback1);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload(", ").setFin(false), callback2);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
sink.accept(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), finCallback);
coreSession.waitForDemand(1, TimeUnit.SECONDS);
finCallback.get(1, TimeUnit.SECONDS); // wait for fin callback
StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World"));
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World"));
}
public static class ReaderCopy implements Consumer<Reader>

View File

@ -45,5 +45,10 @@
<artifactId>websocket-core-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -120,7 +120,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
if (OpCode.isDataFrame(frame.getOpCode()))
{
Frame copy = Frame.copy(frame);
messageSink.accept(copy, Callback.NOOP);
messageSink.accept(copy, Callback.from(() -> {}, Throwable::printStackTrace));
if (frame.isFin())
messageSink = null;
}