Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2017-02-07 22:31:06 +01:00
commit e01636d109
3 changed files with 146 additions and 10 deletions

View File

@ -88,8 +88,6 @@ public abstract class FillInterest
try
{
if (LOG.isDebugEnabled())
LOG.debug("{} register {}",this,callback);
needsFillInterest();
}
catch (Throwable e)

View File

@ -284,7 +284,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
int filled = fillRequestBuffer();
handled = parseRequestBuffer();
if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
if (handled || filled<=0 || _input.hasContent())
break;
}
return handled;
@ -398,7 +398,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
else if (_parser.inContentState() && _generator.isPersistent())
{
// If we are async, then we have problems to complete neatly
if (_channel.getRequest().getHttpInput().isAsync())
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed async input {}", this);
@ -409,7 +409,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {}", this);
// Complete reading the request
if (!_channel.getRequest().getHttpInput().consumeAll())
if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
@ -551,7 +551,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
getEndPoint().tryFillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
@ -627,7 +627,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
if (fillAndParseForContent())
_channel.handle();
else if (!_input.isFinished())
else if (!_input.isFinished() && !_input.hasContent())
asyncReadFillInterested();
}

View File

@ -18,15 +18,21 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -49,6 +55,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@ -135,7 +142,7 @@ public class AsyncIOServletTest extends AbstractTest
scope.set(null);
}
private void sleep(long ms) throws IOException
private void sleep(long ms)
{
try
{
@ -143,7 +150,7 @@ public class AsyncIOServletTest extends AbstractTest
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
throw new UncheckedIOException(new InterruptedIOException());
}
}
@ -1325,4 +1332,135 @@ public class AsyncIOServletTest extends AbstractTest
}
@Test
public void testWriteListenerFromOtherThread() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
request.getInputStream().setReadListener(new Listener(asyncContext));
}
});
int cores = 4;
int iterations = 10;
CountDownLatch latch = new CountDownLatch(cores * iterations);
Deque<Throwable> failures = new LinkedBlockingDeque<>();
for (int i = 0; i < cores; ++i)
{
client.getExecutor().execute(() ->
{
for (int j = 0; j < iterations; ++j)
{
try
{
ContentResponse response = client.newRequest(newURI())
.method(HttpMethod.POST)
.path(servletPath)
.content(new InputStreamContentProvider(new ByteArrayInputStream(new byte[16 * 1024])
{
@Override
public int read(byte[] b, int off, int len)
{
sleep(5);
return super.read(b, off, Math.min(len, 4242));
}
}))
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
latch.countDown();
}
catch (Throwable x)
{
failures.offer(x);
}
}
});
}
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
Assert.assertTrue(failures.isEmpty());
}
private class Listener implements ReadListener, WriteListener
{
private final Executor executor = Executors.newFixedThreadPool(32);
private final CompletableFuture<?> inputComplete = new CompletableFuture<>();
private final CompletableFuture<?> outputComplete = new CompletableFuture<>();
private final AtomicBoolean responseWritten = new AtomicBoolean();
private final AsyncContext asyncContext;
private final HttpServletResponse response;
private final ServletInputStream input;
private final ServletOutputStream output;
public Listener(AsyncContext asyncContext) throws IOException
{
this.asyncContext = asyncContext;
this.response = (HttpServletResponse)asyncContext.getResponse();
this.input = asyncContext.getRequest().getInputStream();
this.output = response.getOutputStream();
CompletableFuture.allOf(inputComplete, outputComplete)
.whenComplete((ignoredResult, ignoredThrowable) -> asyncContext.complete());
// Dispatch setting the write listener to another thread.
executor.execute(() -> output.setWriteListener(this));
}
@Override
public void onDataAvailable() throws IOException
{
byte[] buffer = new byte[16 * 1024];
while (input.isReady())
{
if (input.read(buffer) < 0)
return;
}
}
@Override
public void onAllDataRead() throws IOException
{
inputComplete.complete(null);
}
@Override
public void onWritePossible() throws IOException
{
// Dispatch OWP to another thread.
executor.execute(() ->
{
while (output.isReady())
{
if (responseWritten.compareAndSet(false, true))
{
try
{
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType("text/plain;charset=utf-8");
output.write("Hello world".getBytes());
}
catch (IOException x)
{
throw new UncheckedIOException(x);
}
}
else
{
outputComplete.complete(null);
return;
}
}
});
}
@Override
public void onError(Throwable t)
{
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
asyncContext.complete();
}
}
}