Issue #4331 - Improve handling of HttpOutput.close() for pending writes.
Added test case that verifies the current behavior (abort the response in case complete() is called with a pending write()). Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
9ab6a073fa
commit
d44a6935ef
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.http2.client;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -28,6 +30,7 @@ import javax.servlet.AsyncContext;
|
||||||
import javax.servlet.ReadListener;
|
import javax.servlet.ReadListener;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.ServletInputStream;
|
import javax.servlet.ServletInputStream;
|
||||||
|
import javax.servlet.WriteListener;
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
@ -38,6 +41,9 @@ import org.eclipse.jetty.http2.api.Session;
|
||||||
import org.eclipse.jetty.http2.api.Stream;
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||||
|
import org.eclipse.jetty.server.HttpOutput;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.FuturePromise;
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -217,6 +223,66 @@ public class AsyncIOTest extends AbstractTest
|
||||||
assertEquals(2, count.get());
|
assertEquals(2, count.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectAsyncWriteThenComplete() throws Exception
|
||||||
|
{
|
||||||
|
// Use a small flow control window to stall the server writes.
|
||||||
|
int clientWindow = 16;
|
||||||
|
start(new EmptyHttpServlet()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||||
|
{
|
||||||
|
AsyncContext asyncContext = request.startAsync();
|
||||||
|
HttpOutput output = (HttpOutput)response.getOutputStream();
|
||||||
|
output.setWriteListener(new WriteListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onWritePossible() throws IOException
|
||||||
|
{
|
||||||
|
// The write is too large and will stall.
|
||||||
|
output.write(ByteBuffer.wrap(new byte[2 * clientWindow]));
|
||||||
|
|
||||||
|
// We cannot call complete() now before checking for isReady().
|
||||||
|
// This will abort the response and the client will receive a reset.
|
||||||
|
asyncContext.complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session session = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Map<Integer, Integer> onPreface(Session session)
|
||||||
|
{
|
||||||
|
Map<Integer, Integer> settings = new HashMap<>();
|
||||||
|
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, clientWindow);
|
||||||
|
return settings;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpFields fields = new HttpFields();
|
||||||
|
MetaData.Request metaData = newRequest("GET", fields);
|
||||||
|
HeadersFrame frame = new HeadersFrame(metaData, null, true);
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
|
session.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onReset(Stream stream, ResetFrame frame)
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
private static void sleep(long ms) throws InterruptedIOException
|
private static void sleep(long ms) throws InterruptedIOException
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
|
Loading…
Reference in New Issue