Issue #1047 - ReadPendingException and then thread death.

Fixed by calling tryFillInterested() rather than fillInterested() to
cope with the race between reads scheduling read interest and
setWriteListener() that also executes code in
HttpChannelState.unhandle() that wants to schedule read interest.
This commit is contained in:
Simone Bordet 2017-02-07 22:23:27 +01:00
parent c0d261d7aa
commit 369c73ab45
2 changed files with 146 additions and 7 deletions

View File

@ -284,7 +284,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
int filled = fillRequestBuffer();
handled = parseRequestBuffer();
if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
if (handled || filled<=0 || _input.hasContent())
break;
}
return handled;
@ -398,7 +398,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
else if (_parser.inContentState() && _generator.isPersistent())
{
// If we are async, then we have problems to complete neatly
if (_channel.getRequest().getHttpInput().isAsync())
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed async input {}", this);
@ -409,7 +409,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this);
// Complete reading the request
if (!_channel.getRequest().getHttpInput().consumeAll())
if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
@ -546,7 +546,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
getEndPoint().tryFillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
@ -621,7 +621,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
if (fillAndParseForContent())
_channel.handle();
else if (!_input.isFinished())
else if (!_input.isFinished() && !_input.hasContent())
asyncReadFillInterested();
}

View File

@ -18,14 +18,20 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -48,6 +54,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@ -129,7 +136,7 @@ public class AsyncIOServletTest extends AbstractTest
scope.set(null);
}
private void sleep(long ms) throws IOException
private void sleep(long ms)
{
try
{
@ -137,7 +144,7 @@ public class AsyncIOServletTest extends AbstractTest
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
throw new UncheckedIOException(new InterruptedIOException());
}
}
@ -1042,4 +1049,136 @@ public class AsyncIOServletTest extends AbstractTest
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testWriteListenerFromOtherThread() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new Listener(asyncContext));
}
});
int cores = 4;
int iterations = 10;
CountDownLatch latch = new CountDownLatch(cores * iterations);
Deque<Throwable> failures = new LinkedBlockingDeque<>();
for (int i = 0; i < cores; ++i)
{
client.getExecutor().execute(() ->
{
for (int j = 0; j < iterations; ++j)
{
try
{
ContentResponse response = client.newRequest(newURI())
.method(HttpMethod.POST)
.path(servletPath)
.content(new InputStreamContentProvider(new ByteArrayInputStream(new byte[16 * 1024])
{
@Override
public int read(byte[] b, int off, int len)
{
sleep(5);
return super.read(b, off, Math.min(len, 4242));
}
}))
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
latch.countDown();
}
catch (Throwable x)
{
failures.offer(x);
}
}
});
}
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
Assert.assertTrue(failures.isEmpty());
}
private class Listener implements ReadListener, WriteListener
{
private final Executor executor = Executors.newFixedThreadPool(32);
private final CompletableFuture<?> inputComplete = new CompletableFuture<>();
private final CompletableFuture<?> outputComplete = new CompletableFuture<>();
private final AtomicBoolean responseWritten = new AtomicBoolean();
private final AsyncContext asyncContext;
private final HttpServletResponse response;
private final ServletInputStream input;
private final ServletOutputStream output;
public Listener(AsyncContext asyncContext) throws IOException
{
this.asyncContext = asyncContext;
this.response = (HttpServletResponse)asyncContext.getResponse();
this.input = asyncContext.getRequest().getInputStream();
this.output = response.getOutputStream();
CompletableFuture.allOf(inputComplete, outputComplete)
.whenComplete((ignoredResult, ignoredThrowable) -> asyncContext.complete());
// Dispatch setting the write listener to another thread.
executor.execute(() -> output.setWriteListener(this));
}
@Override
public void onDataAvailable() throws IOException
{
byte[] buffer = new byte[16 * 1024];
while (input.isReady())
{
if (input.read(buffer) < 0)
return;
}
}
@Override
public void onAllDataRead() throws IOException
{
inputComplete.complete(null);
}
@Override
public void onWritePossible() throws IOException
{
// Dispatch OWP to another thread.
executor.execute(() ->
{
while (output.isReady())
{
if (responseWritten.compareAndSet(false, true))
{
try
{
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType("text/plain;charset=utf-8");
output.write("Hello world".getBytes());
}
catch (IOException x)
{
throw new UncheckedIOException(x);
}
}
else
{
outputComplete.complete(null);
return;
}
}
});
}
@Override
public void onError(Throwable t)
{
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
asyncContext.complete();
}
}
}