From 12e2f9e6c80cef39619bd196122134b7ab7df83a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 Feb 2015 13:10:11 +0100 Subject: [PATCH] 459542 - AsyncMiddleManServlet race condition on first download content. Fixed the race condition by submitting a zero length buffer to write from onWritePossible() which will succeed the callback without causing races. --- .../eclipse/jetty/client/HttpReceiver.java | 16 +- .../client/util/DeferredContentProvider.java | 34 +-- .../jetty/proxy/AsyncMiddleManServlet.java | 205 +++++++++--------- .../eclipse/jetty/proxy/ProxyServletTest.java | 2 +- .../eclipse/jetty/util/CountingCallback.java | 94 ++++++++ 5 files changed, 226 insertions(+), 125 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index b8c40b26913..ff00322d1be 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -335,17 +336,10 @@ public abstract class HttpReceiver } else { - Callback partial = new Callback.Adapter() - { - @Override - public void failed(Throwable x) - { - callback.failed(x); - } - }; - - for (int i = 1, size = decodeds.size(); i <= size; ++i) - notifier.notifyContent(listeners, response, decodeds.get(i - 1), i < size ? partial : callback); + int size = decodeds.size(); + CountingCallback counter = new CountingCallback(callback, size); + for (int i = 0; i < size; ++i) + notifier.notifyContent(listeners, response, decodeds.get(i), counter); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java index 43ec5748053..0ce217a6602 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java @@ -87,10 +87,10 @@ import org.eclipse.jetty.util.Callback; */ public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable { - private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE); + private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE); private final Object lock = this; - private final Queue chunks = new ArrayQueue<>(4, 64, lock); + private final Queue chunks = new ArrayQueue<>(4, 64, lock); private final AtomicReference listener = new AtomicReference<>(); private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -121,7 +121,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, synchronized (lock) { long total = 0; - for (AsyncChunk chunk : chunks) + for (Chunk chunk : chunks) total += chunk.buffer.remaining(); length = total; } @@ -148,10 +148,10 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, public boolean offer(ByteBuffer buffer, Callback callback) { - return offer(new AsyncChunk(buffer, callback)); + return offer(new Chunk(buffer, callback)); } - private boolean offer(AsyncChunk chunk) + private boolean offer(Chunk chunk) { Throwable failure; boolean result = false; @@ -243,7 +243,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, private class DeferredContentProviderIterator implements Iterator, Callback { - private AsyncChunk current; + private Chunk current; @Override public boolean hasNext() @@ -259,7 +259,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, { synchronized (lock) { - AsyncChunk chunk = current = chunks.poll(); + Chunk chunk = current = chunks.poll(); if (chunk == CLOSE) throw new NoSuchElementException(); return chunk == null ? null : chunk.buffer; @@ -275,7 +275,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, @Override public void succeeded() { - AsyncChunk chunk; + Chunk chunk; synchronized (lock) { chunk = current; @@ -292,7 +292,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, @Override public void failed(Throwable x) { - List chunks = new ArrayList<>(); + List chunks = new ArrayList<>(); synchronized (lock) { failure = x; @@ -302,20 +302,26 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback, current = null; lock.notify(); } - for (AsyncChunk chunk : chunks) + for (Chunk chunk : chunks) chunk.callback.failed(x); } } - private static class AsyncChunk + public static class Chunk { - private final ByteBuffer buffer; - private final Callback callback; + public final ByteBuffer buffer; + public final Callback callback; - private AsyncChunk(ByteBuffer buffer, Callback callback) + public Chunk(ByteBuffer buffer, Callback callback) { this.buffer = Objects.requireNonNull(buffer); this.callback = Objects.requireNonNull(callback); } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } } } diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java index 0bee62ac665..12bec96de18 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +50,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.IteratingCallback; public class AsyncMiddleManServlet extends AbstractProxyServlet @@ -171,14 +171,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet protected class ProxyReader extends IteratingCallback implements ReadListener { - private final Callback failer = new Adapter() - { - @Override - public void failed(Throwable x) - { - onError(x); - } - }; private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()]; private final List buffers = new ArrayList<>(); private final HttpServletRequest clientRequest; @@ -207,7 +199,16 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet public void onAllDataRead() throws IOException { if (!provider.isClosed()) - process(BufferUtil.EMPTY_BUFFER, failer, true); + { + process(BufferUtil.EMPTY_BUFFER, new Adapter() + { + @Override + public void failed(Throwable x) + { + onError(x); + } + }, true); + } if (_log.isDebugEnabled()) _log.debug("{} proxying content to upstream completed", getRequestId(clientRequest)); @@ -264,37 +265,43 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet clientRequest.setAttribute(CLIENT_TRANSFORMER, transformer); } - if (content.hasRemaining() || finished) + if (!content.hasRemaining() && !finished) { - int contentBytes = content.remaining(); + callback.succeeded(); + return; + } - transformer.transform(content, finished, buffers); + int contentBytes = content.remaining(); + transformer.transform(content, finished, buffers); - int newContentBytes = 0; - int size = buffers.size(); + int newContentBytes = 0; + int size = buffers.size(); + if (size > 0) + { + CountingCallback counter = new CountingCallback(callback, size); for (int i = 0; i < size; ++i) { ByteBuffer buffer = buffers.get(i); newContentBytes += buffer.remaining(); - provider.offer(buffer, i == size - 1 ? callback : failer); + provider.offer(buffer, counter); } buffers.clear(); - - if (finished) - provider.close(); - - if (_log.isDebugEnabled()) - _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes); - - if (!committed) - { - proxyRequest.header(HttpHeader.CONTENT_LENGTH, null); - sendProxyRequest(clientRequest, proxyResponse, proxyRequest); - } - - if (size == 0) - succeeded(); } + + if (finished) + provider.close(); + + if (_log.isDebugEnabled()) + _log.debug("{} upstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes); + + if (!committed) + { + proxyRequest.header(HttpHeader.CONTENT_LENGTH, null); + sendProxyRequest(clientRequest, proxyResponse, proxyRequest); + } + + if (size == 0) + succeeded(); } @Override @@ -368,19 +375,26 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet int newContentBytes = 0; int size = buffers.size(); - for (int i = 0; i < size; ++i) + if (size > 0) { - ByteBuffer buffer = buffers.get(i); - newContentBytes += buffer.remaining(); - proxyWriter.offer(buffer, i == size - 1 ? callback : Callback.Adapter.INSTANCE); + CountingCallback counter = new CountingCallback(callback, size); + for (int i = 0; i < size; ++i) + { + ByteBuffer buffer = buffers.get(i); + newContentBytes += buffer.remaining(); + proxyWriter.offer(buffer, counter); + } + buffers.clear(); } - buffers.clear(); if (_log.isDebugEnabled()) _log.debug("{} downstream content transformation {} -> {} bytes", getRequestId(clientRequest), contentBytes, newContentBytes); if (committed) { - proxyWriter.onWritePossible(); + if (size == 0) + callback.succeeded(); + else + proxyWriter.onWritePossible(); } else { @@ -389,11 +403,15 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet // Setting the WriteListener triggers an invocation to // onWritePossible(), possibly on a different thread. + // We cannot succeed the callback from here, otherwise + // we run into a race where the different thread calls + // onWritePossible() and succeeding the callback causes + // this method to be called again, which also may call + // onWritePossible(). We use a poison pill for this case. + if (size == 0) + proxyWriter.offer(BufferUtil.EMPTY_BUFFER, callback); proxyResponse.getOutputStream().setWriteListener(proxyWriter); } - - if (size == 0) - callback.succeeded(); } catch (Throwable x) { @@ -417,11 +435,9 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet long newContentBytes = 0; int size = buffers.size(); - for (int i = 0; i < size; ++i) + if (size > 0) { - ByteBuffer buffer = buffers.get(i); - newContentBytes += buffer.remaining(); - proxyWriter.offer(buffer, i == size - 1 ? new Callback.Adapter() + Callback callback = new Callback.Adapter() { @Override public void failed(Throwable x) @@ -429,9 +445,15 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (complete.compareAndSet(false, true)) onProxyResponseFailure(clientRequest, proxyResponse, serverResponse, x); } - } : Callback.Adapter.INSTANCE); + }; + for (int i = 0; i < size; ++i) + { + ByteBuffer buffer = buffers.get(i); + newContentBytes += buffer.remaining(); + proxyWriter.offer(buffer, callback); + } + buffers.clear(); } - buffers.clear(); if (_log.isDebugEnabled()) _log.debug("{} downstream content transformation to {} bytes", getRequestId(clientRequest), newContentBytes); @@ -462,10 +484,10 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet protected class ProxyWriter implements WriteListener { - private final Queue chunks = new ArrayDeque<>(); + private final Queue chunks = new ArrayDeque<>(); private final HttpServletRequest clientRequest; private final Response serverResponse; - private AsyncChunk chunk; + private DeferredContentProvider.Chunk chunk; private boolean writePending; protected ProxyWriter(HttpServletRequest clientRequest, Response serverResponse) @@ -478,48 +500,48 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet { if (_log.isDebugEnabled()) _log.debug("{} proxying content to downstream: {} bytes", getRequestId(clientRequest), content.remaining()); - return chunks.offer(new AsyncChunk(content, callback)); + return chunks.offer(new DeferredContentProvider.Chunk(content, callback)); } @Override public void onWritePossible() throws IOException { ServletOutputStream output = clientRequest.getAsyncContext().getResponse().getOutputStream(); - while (true) + + // If we had a pending write, let's succeed it. + if (writePending) { - if (writePending) - { - // The write was pending but is now complete. - writePending = false; - if (_log.isDebugEnabled()) - _log.debug("{} pending async write complete of {} bytes on {}", getRequestId(clientRequest), chunk.length, output); - if (succeed(chunk.callback)) - break; - } - else - { - chunk = chunks.poll(); - if (chunk == null) - break; - - writeProxyResponseContent(output, chunk.buffer); - - if (output.isReady()) - { - if (_log.isDebugEnabled()) - _log.debug("{} async write complete of {} bytes on {}", getRequestId(clientRequest), chunk.length, output); - if (succeed(chunk.callback)) - break; - } - else - { - writePending = true; - if (_log.isDebugEnabled()) - _log.debug("{} async write pending of {} bytes on {}", getRequestId(clientRequest), chunk.length, output); - break; - } - } + if (_log.isDebugEnabled()) + _log.debug("{} pending async write complete of {} on {}", getRequestId(clientRequest), chunk, output); + writePending = false; + if (succeed(chunk.callback)) + return; } + + int length = 0; + DeferredContentProvider.Chunk chunk = null; + while (output.isReady()) + { + if (chunk != null) + { + if (_log.isDebugEnabled()) + _log.debug("{} async write complete of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output); + if (succeed(chunk.callback)) + return; + } + + this.chunk = chunk = chunks.poll(); + if (chunk == null) + return; + + length = chunk.buffer.remaining(); + if (length > 0) + writeProxyResponseContent(output, chunk.buffer); + } + + if (_log.isDebugEnabled()) + _log.debug("{} async write pending of {} ({} bytes) on {}", getRequestId(clientRequest), chunk, length, output); + writePending = true; } private boolean succeed(Callback callback) @@ -533,7 +555,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet // which may remain pending, which means that the reentrant call // to onWritePossible() returns all the way back to just after the // succeed of the callback. There, we cannot just loop attempting - // write, but we need to check whether we are still write pending. + // write, but we need to check whether we are write pending. callback.succeeded(); return writePending; } @@ -541,7 +563,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet @Override public void onError(Throwable failure) { - AsyncChunk chunk = this.chunk; + DeferredContentProvider.Chunk chunk = this.chunk; if (chunk != null) chunk.callback.failed(failure); else @@ -549,21 +571,6 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet } } - // TODO: coalesce this class with the one from DeferredContentProvider ? - private static class AsyncChunk - { - private final ByteBuffer buffer; - private final Callback callback; - private final int length; - - private AsyncChunk(ByteBuffer buffer, Callback callback) - { - this.buffer = Objects.requireNonNull(buffer); - this.callback = Objects.requireNonNull(callback); - this.length = buffer.remaining(); - } - } - /** *

Allows applications to transform upstream and downstream content.

*

Typical use cases of transformations are URL rewriting of HTML anchors diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java index da4548f81af..e1cd32178ea 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletTest.java @@ -99,7 +99,7 @@ public class ProxyServletTest { private static final String PROXIED_HEADER = "X-Proxied"; - @Parameterized.Parameters + @Parameterized.Parameters(name = "{0}") public static Iterable data() { return Arrays.asList(new Object[][]{ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java new file mode 100644 index 00000000000..d997fde8157 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/CountingCallback.java @@ -0,0 +1,94 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + *

A callback wrapper that succeeds the wrapped callback when the count is + * reached, or on first failure.

+ *

This callback is particularly useful when an async operation is split + * into multiple parts, for example when an original byte buffer that needs + * to be written, along with a callback, is split into multiple byte buffers, + * since it allows the original callback to be wrapped and notified only when + * the last part has been processed.

+ *

Example:

+ *
+ * public void process(EndPoint endPoint, ByteBuffer buffer, Callback callback)
+ * {
+ *     ByteBuffer[] buffers = split(buffer);
+ *     CountCallback countCallback = new CountCallback(callback, buffers.length);
+ *     endPoint.write(countCallback, buffers);
+ * }
+ * 
+ */ +public class CountingCallback implements Callback +{ + private final Callback callback; + private final AtomicInteger count; + + public CountingCallback(Callback callback, int count) + { + this.callback = callback; + this.count = new AtomicInteger(count); + } + + @Override + public void succeeded() + { + // Forward success on the last success. + while (true) + { + int current = count.get(); + + // Already completed ? + if (current == 0) + return; + + if (count.compareAndSet(current, current - 1)) + { + if (current == 1) + { + callback.succeeded(); + return; + } + } + } + } + + @Override + public void failed(Throwable failure) + { + // Forward failure on the first failure. + while (true) + { + int current = count.get(); + + // Already completed ? + if (current == 0) + return; + + if (count.compareAndSet(current, 0)) + { + callback.failed(failure); + return; + } + } + } +}