449603 - OutputStreamContentProvider hangs when host is not available.

Fixed by having HttpRequest.abort() to fail the ContentProvider.
In this way, the ContentProvider knows, even before being used to
send the request content, that the request has failed, and may
forward the failure to the client code.
This commit is contained in:
Simone Bordet 2014-11-03 16:50:59 +01:00
parent cbd7b75e10
commit 9c64fb73e4
5 changed files with 231 additions and 28 deletions

View File

@ -674,7 +674,13 @@ public class HttpRequest implements Request
@Override
public boolean abort(Throwable cause)
{
return aborted.compareAndSet(null, Objects.requireNonNull(cause)) && conversation.abort(cause);
if (aborted.compareAndSet(null, Objects.requireNonNull(cause)))
{
if (content instanceof Callback)
((Callback)content).failed(cause);
return conversation.abort(cause);
}
return false;
}
@Override

View File

@ -22,7 +22,9 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
@ -30,6 +32,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -80,14 +85,14 @@ import org.eclipse.jetty.util.Callback;
* }
* </pre>
*/
public class DeferredContentProvider implements AsyncContentProvider, Closeable
public class DeferredContentProvider implements AsyncContentProvider, Callback, Closeable
{
private static final AsyncChunk CLOSE = new AsyncChunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
private final Object lock = this;
private final Queue<AsyncChunk> chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final Iterator<ByteBuffer> iterator = new DeferredContentProviderIterator();
private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
private int size;
private Throwable failure;
@ -136,18 +141,33 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
private boolean offer(AsyncChunk chunk)
{
boolean result;
Throwable failure;
boolean result = false;
synchronized (lock)
{
result = chunks.offer(chunk);
if (result && chunk != CLOSE)
++size;
failure = this.failure;
if (failure == null)
{
result = chunks.offer(chunk);
if (result && chunk != CLOSE)
++size;
}
}
if (result)
if (failure != null)
chunk.callback.failed(failure);
else if (result)
notifyListener();
return result;
}
private void clear()
{
synchronized (lock)
{
chunks.clear();
}
}
public void flush() throws IOException
{
synchronized (lock)
@ -180,6 +200,17 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
offer(CLOSE);
}
@Override
public void succeeded()
{
}
@Override
public void failed(Throwable failure)
{
iterator.failed(failure);
}
private void notifyListener()
{
Listener listener = this.listener.get();
@ -244,14 +275,17 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
@Override
public void failed(Throwable x)
{
AsyncChunk chunk;
List<AsyncChunk> chunks = new ArrayList<>();
synchronized (lock)
{
chunk = current;
failure = x;
// Transfer all chunks to fail them all.
chunks.addAll(DeferredContentProvider.this.chunks);
clear();
current = null;
lock.notify();
}
if (chunk != null)
for (AsyncChunk chunk : chunks)
chunk.callback.failed(x);
}
}

View File

@ -27,6 +27,7 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -50,10 +51,11 @@ import org.eclipse.jetty.util.log.Logger;
* consumed (or when an exception is thrown while reading it), unless otherwise specified
* to the {@link #InputStreamContentProvider(java.io.InputStream, int, boolean) constructor}.
*/
public class InputStreamContentProvider implements ContentProvider
public class InputStreamContentProvider implements ContentProvider, Callback, Closeable
{
private static final Logger LOG = Log.getLogger(InputStreamContentProvider.class);
private final InputStreamContentProviderIterator iterator = new InputStreamContentProviderIterator();
private final InputStream stream;
private final int bufferSize;
private final boolean autoClose;
@ -115,7 +117,35 @@ public class InputStreamContentProvider implements ContentProvider
@Override
public Iterator<ByteBuffer> iterator()
{
return new InputStreamIterator();
return iterator;
}
@Override
public void close()
{
if (autoClose)
{
try
{
stream.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
}
}
@Override
public void succeeded()
{
}
@Override
public void failed(Throwable failure)
{
// TODO: forward the failure to the iterator.
close();
}
/**
@ -133,7 +163,7 @@ public class InputStreamContentProvider implements ContentProvider
* Therefore we need to make sure that {@link #hasNext()} does not perform any side effect (so that
* it can be called multiple times) until {@link #next()} is called.
*/
private class InputStreamIterator implements Iterator<ByteBuffer>, Closeable
private class InputStreamContentProviderIterator implements Iterator<ByteBuffer>, Closeable
{
private Throwable failure;
private ByteBuffer buffer;
@ -227,17 +257,7 @@ public class InputStreamContentProvider implements ContentProvider
@Override
public void close()
{
if (autoClose)
{
try
{
stream.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
}
InputStreamContentProvider.this.close();
}
}
}

View File

@ -18,12 +18,17 @@
package org.eclipse.jetty.client.util;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.Callback;
/**
* A {@link ContentProvider} that provides content asynchronously through an {@link OutputStream}
@ -68,7 +73,7 @@ import org.eclipse.jetty.client.AsyncContentProvider;
* }
* </pre>
*/
public class OutputStreamContentProvider implements AsyncContentProvider
public class OutputStreamContentProvider implements AsyncContentProvider, Callback, Closeable
{
private final DeferredContentProvider deferred = new DeferredContentProvider();
private final OutputStream output = new DeferredOutputStream();
@ -101,11 +106,24 @@ public class OutputStreamContentProvider implements AsyncContentProvider
deferred.offer(buffer);
}
protected void close()
@Override
public void close()
{
deferred.close();
}
@Override
public void succeeded()
{
deferred.succeeded();
}
@Override
public void failed(Throwable failure)
{
deferred.failed(failure);
}
private class DeferredOutputStream extends OutputStream
{
@Override

View File

@ -26,6 +26,7 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -884,7 +885,7 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
}
@Test
public void testBigUploadWithOutputFromInputStream() throws Exception
public void testBigUploadWithOutputStreamFromInputStream() throws Exception
{
start(new AbstractHandler()
{
@ -933,6 +934,130 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@Test
public void testUploadWithOutputStreamFailureToConnect() throws Exception
{
start(new EmptyServerHandler());
final byte[] data = new byte[512];
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
try (OutputStream output = content.getOutputStream())
{
output.write(data);
Assert.fail();
}
catch (IOException x)
{
// Expected
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderFailsMultipleOffers() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch failLatch = new CountDownLatch(2);
final Callback.Adapter callback = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
failLatch.countDown();
}
};
final CountDownLatch completeLatch = new CountDownLatch(1);
final DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.onRequestBegin(new Request.BeginListener()
{
@Override
public void onBegin(Request request)
{
content.offer(ByteBuffer.wrap(new byte[256]), callback);
content.offer(ByteBuffer.wrap(new byte[256]), callback);
request.abort(new Exception("explicitly_thrown_by_test"));
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(failLatch.await(5, TimeUnit.SECONDS));
// Make sure that adding more content results in the callback to be failed.
final CountDownLatch latch = new CountDownLatch(1);
content.offer(ByteBuffer.wrap(new byte[128]), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithConnectFailureClosesStream() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))
{
@Override
public void close() throws IOException
{
super.close();
closeLatch.countDown();
}
};
InputStreamContentProvider content = new InputStreamContentProvider(stream);
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
completeLatch.countDown();
}
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithWriteFailureClosesStream() throws Exception
{