diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
index 828ceede06d..237b835d62f 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
@@ -58,7 +58,7 @@ import org.eclipse.jetty.util.log.Logger;
* {@link HttpContent} may not have content, if the related {@link ContentProvider} is {@code null}, and this
* is reflected by {@link #hasContent()}.
*
- * {@link HttpContent} may have {@link DeferredContentProvider deferred content}, in which case {@link #advance()}
+ * {@link HttpContent} may have {@link AsyncContentProvider deferred content}, in which case {@link #advance()}
* moves the cursor to a position that provides {@code null} {@link #getByteBuffer() buffer} and
* {@link #getContent() content}. When the deferred content is available, a further call to {@link #advance()}
* will move the cursor to a position that provides non {@code null} buffer and content.
@@ -124,7 +124,34 @@ public class HttpContent implements Callback, Closeable
*/
public boolean advance()
{
- if (isLast())
+ boolean advanced;
+ boolean hasNext;
+ ByteBuffer bytes;
+ if (iterator instanceof Synchronizable)
+ {
+ synchronized (((Synchronizable)iterator).getLock())
+ {
+ advanced = iterator.hasNext();
+ bytes = advanced ? iterator.next() : null;
+ hasNext = advanced && iterator.hasNext();
+ }
+ }
+ else
+ {
+ advanced = iterator.hasNext();
+ bytes = advanced ? iterator.next() : null;
+ hasNext = advanced && iterator.hasNext();
+ }
+
+ if (advanced)
+ {
+ buffer = bytes;
+ content = bytes == null ? null : bytes.slice();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes);
+ return bytes != null;
+ }
+ else
{
if (content != AFTER)
{
@@ -134,14 +161,6 @@ public class HttpContent implements Callback, Closeable
}
return false;
}
- else
- {
- ByteBuffer buffer = this.buffer = iterator.next();
- if (LOG.isDebugEnabled())
- LOG.debug("Advanced content to {} chunk {}", isLast() ? "last" : "next", buffer);
- content = buffer == null ? null : buffer.slice();
- return buffer != null;
- }
}
/**
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 72eca44f238..8b515344d61 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -144,6 +144,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case EXPECTING_WITH_CONTENT:
case PROCEEDING_WITH_CONTENT:
case WAITING:
+ case COMPLETED:
+ case FAILED:
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {}", current);
@@ -151,7 +153,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
- throw illegalSenderState(current);
+ illegalSenderState(current);
+ return;
}
}
}
@@ -169,8 +172,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
SenderState newSenderState = SenderState.SENDING;
if (expects100Continue(request))
newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
- if (!updateSenderState(SenderState.IDLE, newSenderState))
- throw illegalSenderState(SenderState.IDLE);
+
+ out: while (true)
+ {
+ SenderState current = senderState.get();
+ switch (current)
+ {
+ case IDLE:
+ case COMPLETED:
+ {
+ if (updateSenderState(current, newSenderState))
+ break out;
+ break;
+ }
+ default:
+ {
+ illegalSenderState(current);
+ return;
+ }
+ }
+ }
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated
@@ -424,16 +445,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
protected void reset()
{
+ HttpContent content = this.content;
+ this.content = null;
content.close();
- content = null;
- senderState.set(SenderState.IDLE);
+ senderState.set(SenderState.COMPLETED);
}
protected void dispose()
{
HttpContent content = this.content;
+ this.content = null;
if (content != null)
content.close();
+ senderState.set(SenderState.FAILED);
}
public void proceed(HttpExchange exchange, Throwable failure)
@@ -482,16 +506,23 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case WAITING:
{
// We received the 100 Continue, now send the content if any.
- if (!updateSenderState(current, SenderState.SENDING))
- throw illegalSenderState(current);
- if (LOG.isDebugEnabled())
- LOG.debug("Proceeding while waiting");
- contentCallback.iterate();
+ if (updateSenderState(current, SenderState.SENDING))
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Proceeding while waiting");
+ contentCallback.iterate();
+ return;
+ }
+ break;
+ }
+ case FAILED:
+ {
return;
}
default:
{
- throw illegalSenderState(current);
+ illegalSenderState(current);
+ return;
}
}
}
@@ -518,9 +549,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return updated;
}
- private RuntimeException illegalSenderState(SenderState current)
+ private void illegalSenderState(SenderState current)
{
- return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
+ anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
}
@Override
@@ -609,7 +640,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
*/
- PROCEEDING_WITH_CONTENT
+ PROCEEDING_WITH_CONTENT,
+ /**
+ * {@link HttpSender} has finished to send the request
+ */
+ COMPLETED,
+ /**
+ * {@link HttpSender} has failed to send the request
+ */
+ FAILED
}
private class CommitCallback implements Callback
@@ -619,11 +658,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
try
{
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
content.succeeded();
process();
}
- // Catch-all for runtime exceptions
- catch (Exception x)
+ catch (Throwable x)
{
anyToFailure(x);
}
@@ -632,6 +673,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void failed(Throwable failure)
{
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
content.failed(failure);
anyToFailure(failure);
}
@@ -646,7 +690,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!headersToCommit(request))
return;
- HttpContent content = HttpSender.this.content;
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
if (!content.hasContent())
{
@@ -709,9 +755,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
updateSenderState(current, SenderState.SENDING);
break;
}
+ case FAILED:
+ {
+ return;
+ }
default:
{
- throw illegalSenderState(current);
+ illegalSenderState(current);
+ return;
}
}
}
@@ -729,6 +780,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return Action.IDLE;
+
while (true)
{
boolean advanced = content.advance();
@@ -748,7 +802,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
}
- SenderState current = HttpSender.this.senderState.get();
+ SenderState current = senderState.get();
switch (current)
{
case SENDING:
@@ -768,7 +822,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
- throw illegalSenderState(current);
+ illegalSenderState(current);
+ return Action.IDLE;
}
}
}
@@ -777,15 +832,24 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
- ByteBuffer buffer = content.getContent();
- someToContent(getHttpExchange().getRequest(), buffer);
+ HttpExchange exchange = getHttpExchange();
+ if (exchange == null)
+ return;
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
content.succeeded();
+ ByteBuffer buffer = content.getContent();
+ someToContent(exchange.getRequest(), buffer);
super.succeeded();
}
@Override
public void onCompleteFailure(Throwable failure)
{
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
content.failed(failure);
anyToFailure(failure);
}
@@ -803,16 +867,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
- content.succeeded();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
+ content.succeeded();
someToSuccess(exchange);
}
@Override
public void failed(Throwable failure)
{
+ HttpContent content = HttpSender.this.content;
+ if (content == null)
+ return;
content.failed(failure);
anyToFailure(failure);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/Synchronizable.java b/jetty-client/src/main/java/org/eclipse/jetty/client/Synchronizable.java
new file mode 100644
index 00000000000..068d6bb0699
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/Synchronizable.java
@@ -0,0 +1,45 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+/**
+ *
Implementations of this interface expose a lock object
+ * via {@link #getLock()} so that callers can synchronize
+ * externally on that lock:
+ *
+ * if (iterator instanceof Synchronizable)
+ * {
+ * Object element = null;
+ * synchronized (((Synchronizable)iterator).getLock())
+ * {
+ * if (iterator.hasNext())
+ * element = iterator.next();
+ * }
+ * }
+ *
+ *
In the example above, the calls to {@code hasNext()} and
+ * {@code next()} are performed "atomically".
+ */
+public interface Synchronizable
+{
+ /**
+ * @return the lock object to synchronize on
+ */
+ public Object getLock();
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
index be9714dde1e..f9aa729afbe 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/DeferredContentProvider.java
@@ -27,11 +27,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
+import org.eclipse.jetty.client.Synchronizable;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@@ -90,7 +90,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
private final Object lock = this;
- private final Queue chunks = new ArrayQueue<>(4, 64, lock);
+ private final ArrayQueue chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference listener = new AtomicReference<>();
private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -241,7 +241,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
return iterator;
}
- private class DeferredContentProviderIterator implements Iterator, Callback
+ private class DeferredContentProviderIterator implements Iterator, Callback, Synchronizable
{
private Chunk current;
@@ -261,7 +261,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
{
Chunk chunk = current = chunks.poll();
if (chunk == CLOSE)
+ {
+ // Slow path: reinsert the CLOSE chunk
+ // so that hasNext() works correctly.
+ chunks.add(0, CLOSE);
throw new NoSuchElementException();
+ }
return chunk == null ? null : chunk.buffer;
}
}
@@ -308,6 +313,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
for (Chunk chunk : chunks)
chunk.callback.failed(x);
}
+
+ @Override
+ public Object getLock()
+ {
+ return lock;
+ }
}
public static class Chunk
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/util/DeferredContentProviderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/util/DeferredContentProviderTest.java
new file mode 100644
index 00000000000..b78fb930991
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/util/DeferredContentProviderTest.java
@@ -0,0 +1,156 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client.util;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.eclipse.jetty.util.Callback;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeferredContentProviderTest
+{
+ private ExecutorService executor;
+
+ @Before
+ public void prepare() throws Exception
+ {
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @After
+ public void dispose() throws Exception
+ {
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testWhenEmptyFlushDoesNotBlock() throws Exception
+ {
+ final DeferredContentProvider provider = new DeferredContentProvider();
+
+ Future> task = executor.submit(new Callable