#6491 add defensive check and introduce HttpParser.isTerminated()

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-07-26 15:11:56 +02:00
parent a4053578cb
commit bd11d6f682
4 changed files with 108 additions and 3 deletions

View File

@ -46,7 +46,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -1874,6 +1878,82 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testHttpParserCloseWithAsyncReads(Scenario scenario) throws Exception
{
CountDownLatch serverOnErrorLatch = new CountDownLatch(1);
start(scenario, new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
if (request.getDispatcherType() != DispatcherType.REQUEST)
return;
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(2000); // allow async to timeout
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
asyncContext.complete();
serverOnErrorLatch.countDown();
}
});
// Close the parser to cause the issue.
org.eclipse.jetty.server.HttpConnection.getCurrentConnection().getParser().close();
}
});
server.start();
int length = 16;
ByteBuffer chunk1 = ByteBuffer.allocate(length / 2);
AsyncRequestContent content = new AsyncRequestContent(chunk1)
{
@Override
public long getLength()
{
return length;
}
};
CountDownLatch clientResultLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.method(HttpMethod.POST)
.body(content)
.send(result -> clientResultLatch.countDown());
Thread.sleep(1000);
ByteBuffer chunk2 = ByteBuffer.allocate(length / 2);
content.offer(chunk2);
content.close();
assertTrue(clientResultLatch.await(5, TimeUnit.SECONDS), "clientResultLatch didn't finish");
assertTrue(serverOnErrorLatch.await(5, TimeUnit.SECONDS), "serverOnErrorLatch didn't finish");
}
private void assertCopyRequest(Request original)
{
Request copy = client.copyRequest((HttpRequest)original, original.getURI());

View File

@ -209,6 +209,7 @@ public class HttpParser
private static final EnumSet<State> __idleStates = EnumSet.of(State.START, State.END, State.CLOSE, State.CLOSED);
private static final EnumSet<State> __completeStates = EnumSet.of(State.END, State.CLOSE, State.CLOSED);
private static final EnumSet<State> __terminatedStates = EnumSet.of(State.CLOSE, State.CLOSED);
private final boolean debugEnabled = LOG.isDebugEnabled(); // Cache debug to help branch prediction
private final HttpHandler _handler;
@ -424,6 +425,11 @@ public class HttpParser
return __completeStates.contains(_state);
}
public boolean isTerminated()
{
return __terminatedStates.contains(_state);
}
public boolean isState(State state)
{
return _state == state;
@ -1555,7 +1561,7 @@ public class HttpParser
if (debugEnabled && whiteSpace > 0)
LOG.debug("Discarded {} CR or LF characters", whiteSpace);
}
else if (isClose() || isClosed())
else if (isTerminated())
{
BufferUtil.clear(buffer);
}

View File

@ -90,7 +90,14 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
LOG.debug("needContent has content immediately available: {}", _content);
return true;
}
try
{
_httpConnection.parseAndFillForContent();
}
catch (Throwable x)
{
_content = new HttpInput.ErrorContent(x);
}
if (_content != null)
{
if (LOG.isDebugEnabled())
@ -111,8 +118,15 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
{
if (LOG.isDebugEnabled())
LOG.debug("produceContent has no content, parsing and filling");
try
{
_httpConnection.parseAndFillForContent();
}
catch (Throwable x)
{
_content = new HttpInput.ErrorContent(x);
}
}
HttpInput.Content result = _content;
if (result != null && !result.isSpecial())
_content = result.isEof() ? EOF : null;

View File

@ -329,6 +329,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
*/
void parseAndFillForContent()
{
// Defensive check to avoid an infinite select/wakeup/fillAndParseForContent/wait loop
// in case the parser was mistakenly closed and the connection was not aborted.
if (_parser.isTerminated())
throw new IllegalStateException("Parser is terminated: " + _parser);
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().