diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/CompletableFutureFrameCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/CompletableFutureFrameCallback.java new file mode 100644 index 00000000000..78856c72aea --- /dev/null +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/CompletableFutureFrameCallback.java @@ -0,0 +1,48 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.io; + +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.FrameCallback; + +public class CompletableFutureFrameCallback extends CompletableFuture implements FrameCallback +{ + private static final Logger LOG = Log.getLogger(CompletableFutureFrameCallback.class); + + @Override + public void fail(Throwable cause) + { + if(LOG.isDebugEnabled()) + LOG.debug("fail()", cause); + + completeExceptionally(cause); + } + + @Override + public void succeed() + { + if(LOG.isDebugEnabled()) + LOG.debug("succeed()"); + + complete(this); + } +} diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/DecoderReaderMessageSinkTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/DecoderReaderMessageSinkTest.java index e8c548f51f4..d8087caf09f 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/DecoderReaderMessageSinkTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/DecoderReaderMessageSinkTest.java @@ -37,6 +37,7 @@ import javax.websocket.EndpointConfig; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.function.EndpointFunctions; +import org.eclipse.jetty.websocket.common.io.CompletableFutureFrameCallback; import org.eclipse.jetty.websocket.common.io.FutureFrameCallback; import org.eclipse.jetty.websocket.jsr356.JsrSession; import org.eclipse.jetty.websocket.jsr356.messages.DecodedReaderMessageSink; @@ -112,16 +113,17 @@ public class DecoderReaderMessageSinkTest FutureFrameCallback callback1 = new FutureFrameCallback(); FutureFrameCallback callback2 = new FutureFrameCallback(); - FutureFrameCallback callback3 = new FutureFrameCallback(); + CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback(); sink.accept(new TextFrame().setPayload("Hello.\n").setFin(false), callback1); sink.accept(new ContinuationFrame().setPayload("Is this thing on?\n").setFin(false), callback2); - sink.accept(new ContinuationFrame().setPayload("Please reply\n").setFin(true), callback3); + sink.accept(new ContinuationFrame().setPayload("Please reply\n").setFin(true), finCallback); + finCallback.get(1, TimeUnit.SECONDS); // wait for fin Lines lines = futureLines.get(1, TimeUnit.SECONDS); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); - assertThat("Callback3.done", callback3.isDone(), is(true)); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Lines.size", lines.size(), is(3)); assertThat("Lines[0]", lines.get(0), is("Hello.")); assertThat("Lines[1]", lines.get(1), is("Is this thing on?")); diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/ReaderMessageSinkTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/ReaderMessageSinkTest.java index ca073f4cd4d..886b4049b14 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/ReaderMessageSinkTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/ReaderMessageSinkTest.java @@ -33,6 +33,9 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.common.io.CompletableFutureFrameCallback; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.io.FutureFrameCallback; @@ -42,6 +45,7 @@ import org.junit.Test; public class ReaderMessageSinkTest { + private static final Logger LOG = Log.getLogger(ReaderMessageSinkTest.class); private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @@ -54,45 +58,47 @@ public class ReaderMessageSinkTest @Test public void testReader_SingleFrame() throws InterruptedException, ExecutionException, TimeoutException { - CompletableFuture futureWriter = new CompletableFuture<>(); - ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(futureWriter)); + CompletableFuture copyFuture = new CompletableFuture<>(); + ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(copyFuture)); + + CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback(); + sink.accept(new TextFrame().setPayload("Hello World"), finCallback); - FutureFrameCallback callback1 = new FutureFrameCallback(); - sink.accept(new TextFrame().setPayload("Hello World"), callback1); - - StringWriter writer = futureWriter.get(1, TimeUnit.SECONDS); - assertThat("Callback1.done", callback1.isDone(), is(true)); + finCallback.get(1, TimeUnit.SECONDS); // wait for callback + StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS); + assertThat("FinCallback.done", finCallback.isDone(), is(true)); assertThat("Writer.contents", writer.getBuffer().toString(), is("Hello World")); } @Test public void testReader_MultiFrame() throws InterruptedException, ExecutionException, TimeoutException { - CompletableFuture futureWriter = new CompletableFuture<>(); - ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(futureWriter)); + CompletableFuture copyFuture = new CompletableFuture<>(); + ReaderMessageSink sink = new ReaderMessageSink(executor, new ReaderCopy(copyFuture)); FutureFrameCallback callback1 = new FutureFrameCallback(); FutureFrameCallback callback2 = new FutureFrameCallback(); - FutureFrameCallback callback3 = new FutureFrameCallback(); + CompletableFutureFrameCallback finCallback = new CompletableFutureFrameCallback(); sink.accept(new TextFrame().setPayload("Hello").setFin(false), callback1); sink.accept(new ContinuationFrame().setPayload(", ").setFin(false), callback2); - sink.accept(new ContinuationFrame().setPayload("World").setFin(true), callback3); + sink.accept(new ContinuationFrame().setPayload("World").setFin(true), finCallback); - StringWriter writer = futureWriter.get(1, TimeUnit.SECONDS); + finCallback.get(1, TimeUnit.SECONDS); // wait for fin callback + StringWriter writer = copyFuture.get(1, TimeUnit.SECONDS); assertThat("Callback1.done", callback1.isDone(), is(true)); assertThat("Callback2.done", callback2.isDone(), is(true)); - assertThat("Callback3.done", callback3.isDone(), is(true)); + assertThat("finCallback.done", finCallback.isDone(), is(true)); assertThat("Writer contents", writer.getBuffer().toString(), is("Hello, World")); } private class ReaderCopy implements Function { - private CompletableFuture futureWriter; + private CompletableFuture copyFuture; - public ReaderCopy(CompletableFuture futureWriter) + public ReaderCopy(CompletableFuture copyFuture) { - this.futureWriter = futureWriter; + this.copyFuture = copyFuture; } @Override @@ -102,11 +108,11 @@ public class ReaderMessageSinkTest { StringWriter writer = new StringWriter(); IO.copy(reader, writer); - futureWriter.complete(writer); + copyFuture.complete(writer); } catch (IOException e) { - futureWriter.completeExceptionally(e); + copyFuture.completeExceptionally(e); } return null; }