Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2019-11-19 17:09:40 +01:00
commit 89fe1c38ef
1 changed files with 66 additions and 0 deletions

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -28,6 +30,7 @@ import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -38,6 +41,9 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.jupiter.api.Test;
@ -217,6 +223,66 @@ public class AsyncIOTest extends AbstractTest
assertEquals(2, count.get());
}
@Test
public void testDirectAsyncWriteThenComplete() throws Exception
{
// Use a small flow control window to stall the server writes.
int clientWindow = 16;
start(new EmptyHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
HttpOutput output = (HttpOutput)response.getOutputStream();
output.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
// The write is too large and will stall.
output.write(ByteBuffer.wrap(new byte[2 * clientWindow]));
// We cannot call complete() now before checking for isReady().
// This will abort the response and the client will receive a reset.
asyncContext.complete();
}
@Override
public void onError(Throwable t)
{
}
});
}
});
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, clientWindow);
return settings;
}
});
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
latch.countDown();
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private static void sleep(long ms) throws InterruptedIOException
{
try