Issue #4376 Async Content Complete (#4377)

* Issue #4376 Async Content Complete

Added test harness to reproduce unready completing write.
Fixed test by not closing output prior to becoming READY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

ERROR state still needs to be closed!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

close after last blocking write

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

If completion has to do a flush, then we need a call to closed to
avoid leaking buffers.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

Reformat

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-12-03 08:50:36 +11:00 committed by GitHub
parent d99ae19201
commit 85cda88b40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 18 deletions

View File

@ -1195,10 +1195,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (_length > 0) if (_length > 0)
_combinedListener.onResponseContent(_request, _content); _combinedListener.onResponseContent(_request, _content);
if (_complete && _state.completeResponse()) if (_complete && _state.completeResponse())
{
_response.getHttpOutput().closed();
_combinedListener.onResponseEnd(_request); _combinedListener.onResponseEnd(_request);
}
super.succeeded(); super.succeeded();
} }
@ -1222,7 +1219,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override @Override
public void failed(Throwable th) public void failed(Throwable th)
{ {
_response.getHttpOutput().closed();
abort(x); abort(x);
super.failed(x); super.failed(x);
} }

View File

@ -968,6 +968,9 @@ public class HttpChannelState
} }
} }
// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();
if (event != null) if (event != null)
{ {
cancelTimeout(event); cancelTimeout(event);

View File

@ -234,6 +234,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{ {
write(content, complete, blocker); write(content, complete, blocker);
blocker.block(); blocker.block();
if (complete)
closed();
} }
catch (Exception failure) catch (Exception failure)
{ {
@ -403,13 +405,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
State state = _state.get(); State state = _state.get();
switch (state) switch (state)
{ {
case CLOSING:
{
if (!_state.compareAndSet(state, State.CLOSED))
break;
releaseBuffer();
return;
}
case CLOSED: case CLOSED:
{ {
return; return;
@ -417,7 +412,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY: case UNREADY:
{ {
if (_state.compareAndSet(state, State.ERROR)) if (_state.compareAndSet(state, State.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async closed") : _onError); {
if (_onError == null)
_onError = new EofException("Async closed");
releaseBuffer();
return;
}
break; break;
} }
default: default:

View File

@ -33,6 +33,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.http.HttpTester;
@ -41,6 +47,7 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -89,7 +96,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture
_delay.get(10, TimeUnit.SECONDS); _delay.get(10, TimeUnit.SECONDS);
getCallback().succeeded(); getCallback().succeeded();
} }
catch(Throwable th) catch (Throwable th)
{ {
th.printStackTrace(); th.printStackTrace();
getCallback().failed(th); getCallback().failed(th);
@ -97,7 +104,6 @@ public class AsyncCompletionTest extends HttpServerTestFixture
} }
} }
@BeforeEach @BeforeEach
public void init() throws Exception public void init() throws Exception
{ {
@ -153,7 +159,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture
@Override @Override
public void onCompleted() public void onCompleted()
{ {
COMPLETE.compareAndSet(false,true); COMPLETE.compareAndSet(false, true);
super.onCompleted(); super.onCompleted();
} }
} }
@ -163,7 +169,8 @@ public class AsyncCompletionTest extends HttpServerTestFixture
{ {
List<Object[]> tests = new ArrayList<>(); List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"}); tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499,"Test async sendError"), 499, "Test async sendError"}); tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data});
return tests.stream().map(Arguments::of); return tests.stream().map(Arguments::of);
} }
@ -197,7 +204,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture
// wait for threads to return to base level // wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while(_threadPool.getBusyThreads() != base) while (_threadPool.getBusyThreads() != base)
{ {
if (System.nanoTime() > end) if (System.nanoTime() > end)
throw new TimeoutException(); throw new TimeoutException();
@ -210,7 +217,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture
// proceed with the completion // proceed with the completion
delay.proceed(); delay.proceed();
while(!COMPLETE.get()) while (!COMPLETE.get())
{ {
if (System.nanoTime() > end) if (System.nanoTime() > end)
throw new TimeoutException(); throw new TimeoutException();
@ -218,4 +225,46 @@ public class AsyncCompletionTest extends HttpServerTestFixture
} }
} }
} }
private static class AsyncReadyCompleteHandler extends AbstractHandler
{
static String data = "Now is the time for all good men to come to the aid of the party";
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1);
@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
{
if (bytes != null)
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
bytes = null;
}
else
{
context.complete();
return;
}
}
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
});
}
}
} }