From 7c53c317ae830d4e518ebe50428028a26aef733c Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 28 Jan 2013 17:32:22 +0100 Subject: [PATCH] Fixed DeferredContentProvider race condition. HttpSender was setting the listener for asynchronous content before its own state was properly setup. This was causing race conditions, where a thread could notify HttpSender and find null data members causing later NPEs. Now the listener is set after the state is setup, removing the race condition. --- .../jetty/client/AsyncContentProvider.java | 4 +- .../org/eclipse/jetty/client/HttpSender.java | 34 +++-- .../client/util/DeferredContentProvider.java | 68 +++++---- .../jetty/client/HttpClientStreamTest.java | 132 ++++++++++++++++++ 4 files changed, 198 insertions(+), 40 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java index 4c0c6c3345c..a45c881978c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncContentProvider.java @@ -39,9 +39,7 @@ public interface AsyncContentProvider extends ContentProvider { /** * Callback method invoked when content is available - * - * @param last whether it is the last notification of content availability */ - public void onContent(boolean last); + public void onContent(); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 67115fdb81b..1575d20c37b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -64,7 +64,7 @@ public class HttpSender implements AsyncContentProvider.Listener } @Override - public void onContent(boolean last) + public void onContent() { while (true) { @@ -134,13 +134,16 @@ public class HttpSender implements AsyncContentProvider.Listener requestNotifier.notifyBegin(request); ContentProvider content = request.getContent(); - if (content instanceof AsyncContentProvider) - ((AsyncContentProvider)content).setListener(this); - this.contentIterator = content == null ? Collections.emptyIterator() : content.iterator(); boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE); assert updated; + + // Setting the listener may trigger calls to onContent() by other + // threads so we must set it only after the state has been updated + if (content instanceof AsyncContentProvider) + ((AsyncContentProvider)content).setListener(this); + send(); } } @@ -308,8 +311,7 @@ public class HttpSender implements AsyncContentProvider.Listener { if (updateSendState(currentSendState, SendState.EXECUTE)) { - // TODO: reload the chunk ? - LOG.debug("??? content for {}", request); + LOG.debug("Deferred content available for {}", request); break out; } break; @@ -338,8 +340,24 @@ public class HttpSender implements AsyncContentProvider.Listener { if (generator.isEnd()) { - if (!updateSendState(SendState.EXECUTE, SendState.IDLE)) - throw new IllegalStateException(); + out: while (true) + { + currentSendState = sendState.get(); + switch (currentSendState) + { + case EXECUTE: + case SCHEDULE: + { + if (!updateSendState(currentSendState, SendState.IDLE)) + throw new IllegalStateException(); + break out; + } + default: + { + throw new IllegalStateException(); + } + } + } success(); } return; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java index e43452147da..64d0e77c504 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java @@ -20,8 +20,10 @@ package org.eclipse.jetty.client.util; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.AsyncContentProvider; import org.eclipse.jetty.client.api.ContentProvider; @@ -72,9 +74,11 @@ import org.eclipse.jetty.client.api.Response; */ public class DeferredContentProvider implements AsyncContentProvider, AutoCloseable { - private final Queue queue = new ConcurrentLinkedQueue<>(); - private volatile Listener listener; - private volatile boolean closed; + private static final ByteBuffer CLOSE = ByteBuffer.allocate(0); + + private final Queue chunks = new ConcurrentLinkedQueue<>(); + private final AtomicReference listener = new AtomicReference<>(); + private final Iterator iterator = new DeferredContentProviderIterator(); /** * Creates a new {@link DeferredContentProvider} with the given initial content @@ -84,13 +88,14 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea public DeferredContentProvider(ByteBuffer... buffers) { for (ByteBuffer buffer : buffers) - queue.offer(buffer); + chunks.offer(buffer); } @Override public void setListener(Listener listener) { - this.listener = listener; + if (!this.listener.compareAndSet(null, listener)) + throw new IllegalStateException(); } @Override @@ -108,8 +113,8 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea */ public boolean offer(ByteBuffer buffer) { - boolean result = queue.offer(buffer); - notifyListener(false); + boolean result = chunks.offer(buffer); + notifyListener(); return result; } @@ -119,39 +124,44 @@ public class DeferredContentProvider implements AsyncContentProvider, AutoClosea */ public void close() { - closed = true; - notifyListener(true); + chunks.offer(CLOSE); + notifyListener(); } - private void notifyListener(boolean last) + private void notifyListener() { - Listener listener = this.listener; + Listener listener = this.listener.get(); if (listener != null) - listener.onContent(last); + listener.onContent(); } @Override public Iterator iterator() { - return new Iterator() + return iterator; + } + + private class DeferredContentProviderIterator implements Iterator + { + @Override + public boolean hasNext() { - @Override - public boolean hasNext() - { - return !queue.isEmpty() || !closed; - } + return chunks.peek() != CLOSE; + } - @Override - public ByteBuffer next() - { - return queue.poll(); - } + @Override + public ByteBuffer next() + { + ByteBuffer element = chunks.poll(); + if (element == CLOSE) + throw new NoSuchElementException(); + return element; + } - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 3bd46cf3273..856172a20b8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -30,9 +30,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -41,11 +44,13 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -373,4 +378,131 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest } Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final byte[] data = new byte[512]; + final DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public void setListener(Listener listener) + { + super.setListener(listener); + // Simulate a concurrent call + offer(ByteBuffer.wrap(data)); + close(); + } + }; + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && + result.getResponse().getStatus() == 200 && + Arrays.equals(data, getContent())) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testUploadWithDeferredContentProviderRacingWithIterator() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final byte[] data = new byte[512]; + final AtomicReference contentRef = new AtomicReference<>(); + final DeferredContentProvider content = new DeferredContentProvider() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + // Data for the deferred content iterator: + // [0] => deferred + // [1] => deferred + // [2] => data + private final byte[][] iteratorData = new byte[3][]; + private final AtomicInteger index = new AtomicInteger(); + + { + iteratorData[0] = null; + iteratorData[1] = null; + iteratorData[2] = data; + } + + @Override + public boolean hasNext() + { + return index.get() < iteratorData.length; + } + + @Override + public ByteBuffer next() + { + byte[] chunk = iteratorData[index.getAndIncrement()]; + ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk); + if (index.get() == 2) + { + contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result); + contentRef.get().close(); + } + return result; + } + + @Override + public void remove() + { + } + }; + } + }; + contentRef.set(content); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && + result.getResponse().getStatus() == 200 && + Arrays.equals(data, getContent())) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } }