Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-07-26 16:15:31 +02:00
parent 64a7dda668
commit 88c383e54b
1 changed files with 80 additions and 0 deletions

View File

@ -52,7 +52,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -1894,6 +1898,82 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testHttpParserCloseWithAsyncReads(Scenario scenario) throws Exception
{
CountDownLatch serverOnErrorLatch = new CountDownLatch(1);
start(scenario, new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
if (request.getDispatcherType() != DispatcherType.REQUEST)
return;
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(2000); // allow async to timeout
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
asyncContext.complete();
serverOnErrorLatch.countDown();
}
});
// Close the parser to cause the issue.
org.eclipse.jetty.server.HttpConnection.getCurrentConnection().getParser().close();
}
});
server.start();
int length = 16;
ByteBuffer chunk1 = ByteBuffer.allocate(length / 2);
DeferredContentProvider content = new DeferredContentProvider(chunk1)
{
@Override
public long getLength()
{
return length;
}
};
CountDownLatch clientResultLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.method(HttpMethod.POST)
.content(content)
.send(result -> clientResultLatch.countDown());
Thread.sleep(1000);
ByteBuffer chunk2 = ByteBuffer.allocate(length / 2);
content.offer(chunk2);
content.close();
assertTrue(clientResultLatch.await(5, TimeUnit.SECONDS), "clientResultLatch didn't finish");
assertTrue(serverOnErrorLatch.await(5, TimeUnit.SECONDS), "serverOnErrorLatch didn't finish");
}
private void assertCopyRequest(Request original)
{
Request copy = client.copyRequest((HttpRequest)original, original.getURI());