Issue #207 - stabilizing testing of Dispatched MessageSinks

This commit is contained in:
Joakim Erdfelt 2017-05-11 06:05:37 -07:00
parent 8c1d7ed8d5
commit 04afed1338
3 changed files with 77 additions and 21 deletions

View File

@ -0,0 +1,48 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
public class CompletableFutureFrameCallback extends CompletableFuture<FrameCallback> implements FrameCallback
{
private static final Logger LOG = Log.getLogger(CompletableFutureFrameCallback.class);
@Override
public void fail(Throwable cause)
{
if(LOG.isDebugEnabled())
LOG.debug("fail()", cause);
completeExceptionally(cause);
}
@Override
public void succeed()
{
if(LOG.isDebugEnabled())
LOG.debug("succeed()");
complete(this);
}
}

View File

@ -37,6 +37,7 @@ import javax.websocket.EndpointConfig;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions; import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.io.CompletableFutureFrameCallback;
import org.eclipse.jetty.websocket.common.io.FutureFrameCallback; import org.eclipse.jetty.websocket.common.io.FutureFrameCallback;
import org.eclipse.jetty.websocket.jsr356.JsrSession; import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.messages.DecodedReaderMessageSink; import org.eclipse.jetty.websocket.jsr356.messages.DecodedReaderMessageSink;
@ -112,16 +113,17 @@ public class DecoderReaderMessageSinkTest
FutureFrameCallback callback1 = new FutureFrameCallback(); FutureFrameCallback callback1 = new FutureFrameCallback();
FutureFrameCallback callback2 = new FutureFrameCallback(); FutureFrameCallback callback2 = new FutureFrameCallback();
FutureFrameCallback callback3 = new FutureFrameCallback(); CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback();
sink.accept(new TextFrame().setPayload("Hello.\n").setFin(false), callback1); sink.accept(new TextFrame().setPayload("Hello.\n").setFin(false), callback1);
sink.accept(new ContinuationFrame().setPayload("Is this thing on?\n").setFin(false), callback2); sink.accept(new ContinuationFrame().setPayload("Is this thing on?\n").setFin(false), callback2);
sink.accept(new ContinuationFrame().setPayload("Please reply\n").setFin(true), callback3); sink.accept(new ContinuationFrame().setPayload("Please reply\n").setFin(true), finCallback);
finCallback.get(1, TimeUnit.SECONDS); // wait for fin
Lines lines = futureLines.get(1, TimeUnit.SECONDS); Lines lines = futureLines.get(1, TimeUnit.SECONDS);
assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("Callback3.done", callback3.isDone(), is(true)); assertThat("FinCallback.done", finCallback.isDone(), is(true));
assertThat("Lines.size", lines.size(), is(3)); assertThat("Lines.size", lines.size(), is(3));
assertThat("Lines[0]", lines.get(0), is("Hello.")); assertThat("Lines[0]", lines.get(0), is("Hello."));
assertThat("Lines[1]", lines.get(1), is("Is this thing on?")); assertThat("Lines[1]", lines.get(1), is("Is this thing on?"));

View File

@ -33,6 +33,9 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Function; import java.util.function.Function;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.io.CompletableFutureFrameCallback;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FutureFrameCallback; import org.eclipse.jetty.websocket.common.io.FutureFrameCallback;
@ -42,6 +45,7 @@ import org.junit.Test;
public class ReaderMessageSinkTest public class ReaderMessageSinkTest
{ {
private static final Logger LOG = Log.getLogger(ReaderMessageSinkTest.class);
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()); new LinkedBlockingQueue<>());
@ -54,45 +58,47 @@ public class ReaderMessageSinkTest
@Test @Test
public void testReader_SingleFrame() throws InterruptedException, ExecutionException, TimeoutException public void testReader_SingleFrame() throws InterruptedException, ExecutionException, TimeoutException
{ {
CompletableFuture<StringWriter> futureWriter = new CompletableFuture<>(); CompletableFuture<StringWriter> copyFuture = new CompletableFuture<>();
ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(futureWriter)); ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(copyFuture));
CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback();
sink.accept(new TextFrame().setPayload("Hello World"), finCallback);
FutureFrameCallback callback1 = new FutureFrameCallback(); finCallback.get(1, TimeUnit.SECONDS); // wait for callback
sink.accept(new TextFrame().setPayload("Hello World"), callback1); StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("FinCallback.done", finCallback.isDone(), is(true));
StringWriter writer = futureWriter.get(1, TimeUnit.SECONDS);
assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World")); assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World"));
} }
@Test @Test
public void testReader_MultiFrame() throws InterruptedException, ExecutionException, TimeoutException public void testReader_MultiFrame() throws InterruptedException, ExecutionException, TimeoutException
{ {
CompletableFuture<StringWriter> futureWriter = new CompletableFuture<>(); CompletableFuture<StringWriter> copyFuture = new CompletableFuture<>();
ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(futureWriter)); ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(copyFuture));
FutureFrameCallback callback1 = new FutureFrameCallback(); FutureFrameCallback callback1 = new FutureFrameCallback();
FutureFrameCallback callback2 = new FutureFrameCallback(); FutureFrameCallback callback2 = new FutureFrameCallback();
FutureFrameCallback callback3 = new FutureFrameCallback(); CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback();
sink.accept(new TextFrame().setPayload("Hello").setFin(false), callback1); sink.accept(new TextFrame().setPayload("Hello").setFin(false), callback1);
sink.accept(new ContinuationFrame().setPayload(", ").setFin(false), callback2); sink.accept(new ContinuationFrame().setPayload(", ").setFin(false), callback2);
sink.accept(new ContinuationFrame().setPayload("World").setFin(true), callback3); sink.accept(new ContinuationFrame().setPayload("World").setFin(true), finCallback);
StringWriter writer = futureWriter.get(1, TimeUnit.SECONDS); finCallback.get(1, TimeUnit.SECONDS); // wait for fin callback
StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS);
assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback1.done", callback1.isDone(), is(true));
assertThat("Callback2.done", callback2.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true));
assertThat("Callback3.done", callback3.isDone(), is(true)); assertThat("finCallback.done", finCallback.isDone(), is(true));
assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World")); assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World"));
} }
private class ReaderCopy implements Function<Reader, Void> private class ReaderCopy implements Function<Reader, Void>
{ {
private CompletableFuture<StringWriter> futureWriter; private CompletableFuture<StringWriter> copyFuture;
public ReaderCopy(CompletableFuture<StringWriter> futureWriter) public ReaderCopy(CompletableFuture<StringWriter> copyFuture)
{ {
this.futureWriter = futureWriter; this.copyFuture = copyFuture;
} }
@Override @Override
@ -102,11 +108,11 @@ public class ReaderMessageSinkTest
{ {
StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
IO.copy(reader, writer); IO.copy(reader, writer);
futureWriter.complete(writer); copyFuture.complete(writer);
} }
catch (IOException e) catch (IOException e)
{ {
futureWriter.completeExceptionally(e); copyFuture.completeExceptionally(e);
} }
return null; return null;
} }