Fixes #6168 - Improve handling of unconsumed content

Added or expanded the scope of catch blocks to properly handle exceptions thrown by `HttpInput.Interceptor`.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-04-13 16:54:53 +02:00 committed by GitHub
parent 57d0bae2f9
commit fe359ac117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 579 additions and 89 deletions

View File

@ -293,6 +293,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
BufferUtil.clear(_requestBuffer);
releaseRequestBuffer();
close();
}
finally
{
setCurrentConnection(last);
@ -322,10 +330,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)
{
LOG.warn("{} fill with unconsumed content!", this);
return 0;
}
throw new IllegalStateException("fill with unconsumed content on " + this);
if (BufferUtil.isEmpty(_requestBuffer))
{
@ -353,11 +358,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
catch (IOException e)
{
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug(e);
_parser.atEOF();
return -1;
}
}
return 0;
}

View File

@ -235,7 +235,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
produceContent();
}
catch (IOException e)
catch (Throwable e)
{
woken = failed(e);
}
@ -390,7 +390,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*
* @return Content or null
*/
protected Content nextNonSentinelContent()
protected Content nextNonSentinelContent() throws IOException
{
while (true)
{
@ -416,7 +416,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content produceNextContext() throws IOException
protected Content produceNextContent() throws IOException
{
Content content = nextInterceptedContent();
if (content == null && !isFinished())
@ -433,7 +433,7 @@ public class HttpInput extends ServletInputStream implements Runnable
*
* @return Content with remaining, a {@link SentinelContent}, or null
*/
protected Content nextInterceptedContent()
protected Content nextInterceptedContent() throws IOException
{
// If we have a chunk produced by interception
if (_intercepted != null)
@ -458,9 +458,10 @@ public class HttpInput extends ServletInputStream implements Runnable
// Are we intercepting?
if (_interceptor != null)
{
// Intercept the current content (may be called several
// times for the same content
_intercepted = _interceptor.readFrom(_content);
// Intercept the current content.
// The interceptor may be called several
// times for the same content.
_intercepted = intercept(_content);
// If interception produced new content
if (_intercepted != null && _intercepted != _content)
@ -492,6 +493,24 @@ public class HttpInput extends ServletInputStream implements Runnable
return null;
}
private Content intercept(Content content) throws IOException
{
try
{
return _interceptor.readFrom(content);
}
catch (Throwable x)
{
IOException failure = new IOException("Bad content", x);
content.failed(failure);
HttpChannel channel = _channelState.getHttpChannel();
Response response = channel.getResponse();
if (response.isCommitted())
channel.abort(failure);
throw failure;
}
}
private void consume(Content content)
{
if (!isError() && content instanceof EofContent)
@ -529,21 +548,6 @@ public class HttpInput extends ServletInputStream implements Runnable
return l;
}
/**
* Consumes the given content. Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = content.skip(length);
_contentConsumed += l;
if (l > 0 && content.isEmpty())
nextNonSentinelContent(); // hungry succeed
}
/**
* Blocks until some content or some end-of-file event arrives.
*
@ -620,10 +624,19 @@ public class HttpInput extends ServletInputStream implements Runnable
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);
if (nextInterceptedContent() != null)
return wakeup();
else
return false;
try
{
if (nextInterceptedContent() != null)
return wakeup();
else
return false;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("", x);
return failed(x);
}
}
}
}
@ -686,6 +699,7 @@ public class HttpInput extends ServletInputStream implements Runnable
* Consume all available content without blocking.
* Raw content is counted in the {@link #getContentReceived()} statistics, but
* is not intercepted nor counted in the {@link #getContentConsumed()} statistics
*
* @return True if EOF was reached, false otherwise.
*/
public boolean consumeAll()
@ -765,9 +779,9 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isReady()
{
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener == null)
return true;
@ -775,17 +789,19 @@ public class HttpInput extends ServletInputStream implements Runnable
return true;
if (_waitingForContent)
return false;
if (produceNextContext() != null)
if (produceNextContent() != null)
return true;
_channelState.onReadUnready();
_waitingForContent = true;
return false;
}
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
return true;
}
return false;
}
catch (IOException e)
{
LOG.ignore(e);
return true;
}
}
@ -793,9 +809,9 @@ public class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener)
{
boolean woken = false;
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener != null)
throw new IllegalStateException("ReadListener already set");
@ -808,7 +824,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
else
{
Content content = produceNextContext();
Content content = produceNextContent();
if (content != null)
{
_state = ASYNC;
@ -827,10 +843,13 @@ public class HttpInput extends ServletInputStream implements Runnable
}
}
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
woken = _channelState.onReadReady();
}
}
if (woken)
@ -895,49 +914,49 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public void run()
{
final ReadListener listener;
Throwable error;
ReadListener listener = null;
Throwable error = null;
boolean aeof = false;
synchronized (_inputQ)
{
listener = _listener;
if (_state == EOF)
return;
if (_state == AEOF)
{
_state = EOF;
aeof = true;
}
error = _state.getError();
if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
return;
// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
{
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
{
aeof = true;
_state = EOF;
}
}
}
}
try
{
synchronized (_inputQ)
{
listener = _listener;
if (_state == EOF)
return;
if (_state == AEOF)
{
_state = EOF;
aeof = true;
}
error = _state.getError();
if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
return;
// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
{
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
{
aeof = true;
_state = EOF;
}
}
}
}
if (error != null)
{
// TODO is this necessary to add here?
@ -958,7 +977,8 @@ public class HttpInput extends ServletInputStream implements Runnable
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug("", e);
try
{
if (aeof || error == null)
@ -1106,7 +1126,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
}
protected class ErrorState extends EOFState
protected static class ErrorState extends EOFState
{
final Throwable _error;
@ -1155,7 +1175,7 @@ public class HttpInput extends ServletInputStream implements Runnable
protected static final State ASYNC = new State()
{
@Override
public int noContent() throws IOException
public int noContent()
{
return 0;
}

View File

@ -0,0 +1,463 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.test;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.LifeCycle;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpInputInterceptorTest
{
private Server server;
private HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory();
private ServerConnector connector;
private HttpClient client;
private void start(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server, 1, 1, httpConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
client = new HttpClient();
server.addBean(client);
server.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(server);
}
@Test
public void testBlockingReadInterceptorThrows() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
throw new RuntimeException();
});
assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream()));
serverLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(new byte[1]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@Test
public void testBlockingReadInterceptorConsumesHalfThenThrows() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
// Consume some and then throw.
AtomicInteger readCount = new AtomicInteger();
jettyRequest.getHttpInput().addInterceptor(content ->
{
int reads = readCount.incrementAndGet();
if (reads == 1)
{
ByteBuffer buffer = content.getByteBuffer();
int half = buffer.remaining() / 2;
int limit = buffer.limit();
buffer.limit(buffer.position() + half);
ByteBuffer chunk = buffer.slice();
buffer.position(buffer.limit());
buffer.limit(limit);
return new HttpInput.Content(chunk);
}
throw new RuntimeException();
});
assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream()));
serverLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(new byte[1024]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testAsyncResponseWithoutReadingRequestContentWithInterceptorThatThrows(boolean commitResponse) throws Exception
{
AtomicLong onFillableCount = new AtomicLong();
httpConnectionFactory = new HttpConnectionFactory()
{
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
HttpConnection connection = new HttpConnection(getHttpConfiguration(), connector, endPoint, getHttpCompliance(), isRecordHttpComplianceViolations())
{
@Override
public void onFillable()
{
onFillableCount.incrementAndGet();
super.onFillable();
}
};
return configure(connection, connector, endPoint);
}
};
long delay = 500;
CountDownLatch contentLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
AtomicInteger readCount = new AtomicInteger();
jettyRequest.getHttpInput().addInterceptor(content ->
{
if (readCount.incrementAndGet() == 1)
{
// Tell the client to write more content.
contentLatch.countDown();
// Wait to let the content arrive to the server.
sleep(delay);
}
throw new RuntimeException();
});
AsyncContext asyncContext = request.startAsync();
response.getOutputStream().setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
if (commitResponse)
response.getOutputStream().close();
asyncContext.complete();
}
@Override
public void onError(Throwable error)
{
error.printStackTrace();
}
});
}
});
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort())))
{
// The request must have a content chunk so that it gets dispatched.
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"1\r\n" +
"A\r\n";
client.write(StandardCharsets.UTF_8.encode(request));
// Write the remaining content.
// This triggers to fill and parse again after consumeAll(),
// and we want to verify that the code does not spin.
assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
String content = "" +
"1\r\n" +
"X\r\n" +
"0\r\n" +
"\r\n";
client.write(StandardCharsets.UTF_8.encode(content));
// Wait and verify that we did not spin.
sleep(4 * delay);
assertThat(onFillableCount.get(), Matchers.lessThan(10L));
// Connection must be closed by the server.
Socket socket = client.socket();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
HttpTester.Response response = HttpTester.parseResponse(input);
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
try
{
while (true)
{
if (input.read() < 0)
break;
}
}
catch (IOException ignored)
{
// Java 8 may throw IOException: Connection reset by peer
// but that's ok (the server closed the connection).
}
}
}
@Test
public void testAvailableReadInterceptorThrows() throws Exception
{
CountDownLatch interceptorLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
interceptorLatch.countDown();
throw new RuntimeException();
});
int available = request.getInputStream().available();
assertEquals(0, available);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@Test
public void testIsReadyReadInterceptorThrows() throws Exception
{
byte[] bytes = new byte[]{13};
CountDownLatch interceptorLatch = new CountDownLatch(1);
CountDownLatch readFailureLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
AtomicBoolean onDataAvailable = new AtomicBoolean();
jettyRequest.getHttpInput().addInterceptor(content ->
{
if (onDataAvailable.get())
{
interceptorLatch.countDown();
throw new RuntimeException();
}
else
{
return content;
}
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
onDataAvailable.set(true);
// Now the interceptor should throw, but isReady() should not.
if (input.isReady())
{
assertThrows(IOException.class, () -> assertEquals(bytes[0], input.read()));
readFailureLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
asyncContext.complete();
}
}
@Override
public void onAllDataRead()
{
}
@Override
public void onError(Throwable error)
{
error.printStackTrace();
}
});
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(bytes))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertTrue(readFailureLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@Test
public void testSetReadListenerReadInterceptorThrows() throws Exception
{
RuntimeException failure = new RuntimeException();
CountDownLatch interceptorLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
interceptorLatch.countDown();
failure.addSuppressed(new Throwable());
throw failure;
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
}
@Override
public void onAllDataRead()
{
}
@Override
public void onError(Throwable error)
{
assertSame(failure, error.getCause());
response.setStatus(HttpStatus.NO_CONTENT_204);
asyncContext.complete();
}
});
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(new byte[1]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
private static void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
}