430808 - OutputStreamContentProvider violates OutputStream contract.

Fixed OutputStreamContentProvider to perform blocking write() calls and
implemented OutputStream.flush().
HttpSender now notifies the ContentProvider iterator of write completion
and if it implements Callback.
This is used in DeferredContentProvider to provide a blocking flush()
functionality.
This commit is contained in:
Simone Bordet 2014-03-21 21:13:55 +01:00
parent 5c0aae2f12
commit 565d17dc8c
5 changed files with 207 additions and 45 deletions

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -63,7 +64,7 @@ import org.eclipse.jetty.util.log.Logger;
* {@link #getContent() content}. When the deferred content is available, a further call to {@link #advance()}
* will move the cursor to a position that provides non {@code null} buffer and content.
*/
public class HttpContent implements Closeable
public class HttpContent implements Callback, Closeable
{
private static final Logger LOG = Log.getLogger(HttpContent.class);
private static final ByteBuffer AFTER = ByteBuffer.allocate(0);
@ -151,6 +152,20 @@ public class HttpContent implements Closeable
return content == AFTER;
}
@Override
public void succeeded()
{
if (iterator instanceof Callback)
((Callback)iterator).succeeded();
}
@Override
public void failed(Throwable x)
{
if (iterator instanceof Callback)
((Callback)iterator).failed(x);
}
@Override
public void close()
{

View File

@ -609,6 +609,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
try
{
content.succeeded();
process();
}
// Catch-all for runtime exceptions
@ -618,6 +619,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
@Override
public void failed(Throwable failure)
{
content.failed(failure);
anyToFailure(failure);
}
private void process() throws Exception
{
HttpExchange exchange = getHttpExchange();
@ -721,12 +729,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
}
@Override
public void failed(Throwable failure)
{
anyToFailure(failure);
}
}
private class ContentCallback extends IteratingCallback
@ -748,42 +750,56 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
}
if (content.advance())
{
// There is more content to send
sendContent(exchange, content, this);
return Action.SCHEDULED;
}
if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
return Action.SCHEDULED;
}
while (true)
{
boolean advanced = content.advance();
boolean consumed = content.isConsumed();
SenderState current = senderState.get();
switch (current)
{
case SENDING:
{
if (updateSenderState(current, SenderState.IDLE))
if (advanced)
{
LOG.debug("Waiting for deferred content for {}", request);
// There is more content to send
sendContent(exchange, content, this);
return Action.SCHEDULED;
}
else if (consumed)
{
sendContent(exchange, content, lastCallback);
return Action.IDLE;
}
break;
else
{
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
return Action.IDLE;
}
break;
}
}
case SENDING_WITH_CONTENT:
{
if (updateSenderState(current, SenderState.SENDING))
{
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
return Action.SCHEDULED;
if (advanced)
{
sendContent(exchange, content, this);
return Action.SCHEDULED;
}
else if (consumed)
{
sendContent(exchange, content, lastCallback);
return Action.IDLE;
}
else
{
throw new IllegalStateException();
}
}
break;
}
@ -796,18 +812,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
@Override
protected void completed()
public void succeeded()
{
// Nothing to do, since we always return false from process().
// Termination is obtained via LastContentCallback.
content.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable failure)
{
content.failed(failure);
super.failed(failure);
anyToFailure(failure);
}
@Override
protected void completed()
{
// Nothing to do, since we always return false from process().
// Termination is obtained via LastContentCallback.
}
}
private class LastContentCallback implements Callback
@ -815,6 +839,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
content.succeeded();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
@ -824,6 +849,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void failed(Throwable failure)
{
content.failed(failure);
anyToFailure(failure);
}
}

View File

@ -19,11 +19,12 @@
package org.eclipse.jetty.client.util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -31,6 +32,8 @@ import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback;
/**
* A {@link ContentProvider} that allows to add content after {@link Request#send(Response.CompleteListener)}
@ -82,10 +85,13 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
{
private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
private final Queue<ByteBuffer> chunks = new ConcurrentLinkedQueue<>();
private final Object lock = this;
private final Queue<ByteBuffer> chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
private int size;
private Throwable failure;
/**
* Creates a new {@link DeferredContentProvider} with the given initial content
@ -95,7 +101,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
public DeferredContentProvider(ByteBuffer... buffers)
{
for (ByteBuffer buffer : buffers)
chunks.offer(buffer);
offer(buffer);
}
@Override
@ -121,11 +127,40 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
*/
public boolean offer(ByteBuffer buffer)
{
boolean result = chunks.offer(buffer);
notifyListener();
boolean result;
synchronized (lock)
{
result = chunks.offer(buffer);
if (result && buffer != CLOSE)
++size;
}
if (result)
notifyListener();
return result;
}
public void flush() throws IOException
{
synchronized (lock)
{
try
{
while (true)
{
if (failure != null)
throw new IOException(failure);
if (size == 0)
break;
lock.wait();
}
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}
/**
* No more content will be added to this content provider
* and notifies the listener that no more content is available.
@ -133,10 +168,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
public void close()
{
if (closed.compareAndSet(false, true))
{
chunks.offer(CLOSE);
notifyListener();
}
offer(CLOSE);
}
private void notifyListener()
@ -152,21 +184,29 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
return iterator;
}
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
{
private ByteBuffer current;
@Override
public boolean hasNext()
{
return chunks.peek() != CLOSE;
synchronized (lock)
{
return chunks.peek() != CLOSE;
}
}
@Override
public ByteBuffer next()
{
ByteBuffer element = chunks.poll();
if (element == CLOSE)
throw new NoSuchElementException();
return element;
synchronized (lock)
{
ByteBuffer element = current = chunks.poll();
if (element == CLOSE)
throw new NoSuchElementException();
return element;
}
}
@Override
@ -174,5 +214,28 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
{
throw new UnsupportedOperationException();
}
@Override
public void succeeded()
{
synchronized (lock)
{
if (current != null)
{
--size;
lock.notify();
}
}
}
@Override
public void failed(Throwable x)
{
synchronized (lock)
{
failure = x;
lock.notify();
}
}
}
}

View File

@ -121,6 +121,13 @@ public class OutputStreamContentProvider implements AsyncContentProvider
public void write(byte[] b, int off, int len) throws IOException
{
OutputStreamContentProvider.this.write(ByteBuffer.wrap(b, off, len));
flush();
}
@Override
public void flush() throws IOException
{
deferred.flush();
}
@Override

View File

@ -31,6 +31,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -837,6 +838,56 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBigUploadWithOutputFromInputStream() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new BufferingResponseListener(data.length)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
Assert.assertEquals(200, result.getResponse().getStatus());
Assert.assertArrayEquals(data, getContent());
latch.countDown();
}
});
// Make sure we provide the content *after* the request has been "sent".
Thread.sleep(1000);
try (InputStream input = new ByteArrayInputStream(data); OutputStream output = content.getOutputStream())
{
byte[] buffer = new byte[1024];
while (true)
{
int read = input.read(buffer);
if (read < 0)
break;
output.write(buffer, 0, read);
}
}
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@Test
public void testUploadWithWriteFailureClosesStream() throws Exception
{