diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index 5f499bd0c9f..f98f4828942 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.client.http; import java.io.EOFException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpExchange; @@ -34,7 +33,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CompletableCallback; public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler { @@ -226,27 +225,22 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (exchange == null) return false; - final AtomicBoolean completed = new AtomicBoolean(); - Callback callback = new Callback() + CompletableCallback callback = new CompletableCallback() { @Override - public void succeeded() + public void resume() { - if (!completed.compareAndSet(false, true)) - { - LOG.debug("Content consumed asynchronously, resuming processing"); - process(); - } + LOG.debug("Content consumed asynchronously, resuming processing"); + process(); } - @Override - public void failed(Throwable x) + public void abort(Throwable x) { failAndClose(x); } }; responseContent(exchange, buffer, callback); - return completed.compareAndSet(false, true); + return callback.tryComplete(); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java index 56fcd1a9220..4ca0db19ccf 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.client; import java.io.IOException; -import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -86,7 +85,7 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest } }); - Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS)); + Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); Callback callback = callbackRef.get(); // Wait a while to be sure that the parsing does not proceed. @@ -99,7 +98,7 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest contentLatch.set(new CountDownLatch(1)); callback.succeeded(); - Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS)); + Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); callback = callbackRef.get(); // Wait a while to be sure that the parsing does not proceed. @@ -116,12 +115,4 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); Assert.assertEquals(2, contentCount.get()); } - - public void test() throws Exception - { - try (Socket socket = new Socket()) - { - System.out.println("socket = " + socket); - } - } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java new file mode 100644 index 00000000000..ec5a30a84e6 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java @@ -0,0 +1,100 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A callback to be used by driver code that needs to know whether the callback has been + * succeeded or failed (that is, completed) just after the asynchronous operation or not, + * typically because further processing depends on the callback being completed. + * The driver code competes with the asynchronous operation to complete the callback. + *

+ * If the callback is already completed, the driver code continues the processing, + * otherwise it suspends it. If it is suspended, the callback will be completed some time + * later, and {@link #resume()} or {@link #abort(Throwable)} will be called to allow the + * application to resume the processing. + *

+ * Typical usage: + *

+ * CompletableCallback callback = new CompletableCallback()
+ * {
+ *     @Override
+ *     public void resume()
+ *     {
+ *         // continue processing
+ *     }
+ *
+ *     @Override
+ *     public void abort(Throwable failure)
+ *     {
+ *         // abort processing
+ *     }
+ * }
+ * asyncOperation(callback);
+ * boolean completed = callback.tryComplete();
+ * if (completed)
+ *     // suspend processing, async operation not done yet
+ * else
+ *     // continue processing, async operation already done
+ * 
+ */ +public abstract class CompletableCallback implements Callback +{ + private final AtomicBoolean completed = new AtomicBoolean(); + + @Override + public void succeeded() + { + if (!tryComplete()) + resume(); + } + + @Override + public void failed(Throwable x) + { + if (!tryComplete()) + abort(x); + } + + /** + * Callback method invoked when this callback is succeeded + * after a first call to {@link #tryComplete()}. + */ + public abstract void resume(); + + /** + * Callback method invoked when this callback is failed + * after a first call to {@link #tryComplete()}. + */ + public abstract void abort(Throwable failure); + + /** + * Tries to complete this callback; driver code should call + * this method once after the asynchronous operation + * to detect whether the asynchronous operation has already + * completed or not. + * + * @return whether the attempt to complete was successful. + */ + public boolean tryComplete() + { + return completed.compareAndSet(false, true); + } +}