Improve handling of HttpInput.Interceptor behavior

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-04-15 15:31:32 +02:00
parent 4c98990cd9
commit 25467f8904
3 changed files with 381 additions and 11 deletions

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@ -302,12 +303,16 @@ class AsyncContentProducer implements ContentProducer
// In case the _rawContent was set by consumeAll(), check the httpChannel // In case the _rawContent was set by consumeAll(), check the httpChannel
// to see if it has a more precise error. Otherwise, the exact same // to see if it has a more precise error. Otherwise, the exact same
// special content will be returned by the httpChannel. // special content will be returned by the httpChannel; do not do that
HttpInput.Content refreshedRawContent = produceRawContent(); // if the _error flag was set, meaning the current error is definitive.
if (refreshedRawContent != null) if (!_error)
_rawContent = refreshedRawContent; {
HttpInput.Content refreshedRawContent = produceRawContent();
if (refreshedRawContent != null)
_rawContent = refreshedRawContent;
_error = _rawContent.getError() != null;
}
_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this); LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
return _rawContent; return _rawContent;
@ -317,7 +322,9 @@ class AsyncContentProducer implements ContentProducer
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("using interceptor to transform raw content {}", this); LOG.debug("using interceptor to transform raw content {}", this);
_transformedContent = _interceptor.readFrom(_rawContent); _transformedContent = intercept();
if (_error)
return _rawContent;
} }
else else
{ {
@ -369,6 +376,26 @@ class AsyncContentProducer implements ContentProducer
return _transformedContent; return _transformedContent;
} }
private HttpInput.Content intercept()
{
try
{
return _interceptor.readFrom(_rawContent);
}
catch (Throwable x)
{
IOException failure = new IOException("Bad content", x);
failCurrentContent(failure);
// Set the _error flag to mark the error as definitive, i.e.:
// do not try to produce new raw content to get a fresher error.
_error = true;
Response response = _httpChannel.getResponse();
if (response.isCommitted())
_httpChannel.abort(failure);
return null;
}
}
private HttpInput.Content produceRawContent() private HttpInput.Content produceRawContent()
{ {
HttpInput.Content content = _httpChannel.produceContent(); HttpInput.Content content = _httpChannel.produceContent();

View File

@ -298,6 +298,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
} }
} }
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
BufferUtil.clear(_requestBuffer);
releaseRequestBuffer();
getEndPoint().close(x);
}
finally finally
{ {
setCurrentConnection(last); setCurrentConnection(last);
@ -331,10 +339,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private int fillRequestBuffer() private int fillRequestBuffer()
{ {
if (_contentBufferReferences.get() > 0) if (_contentBufferReferences.get() > 0)
{ throw new IllegalStateException("fill with unconsumed content on " + this);
LOG.warn("{} fill with unconsumed content!", this);
return 0;
}
if (BufferUtil.isEmpty(_requestBuffer)) if (BufferUtil.isEmpty(_requestBuffer))
{ {
@ -362,7 +367,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
catch (IOException e) catch (IOException e)
{ {
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), e); if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), e);
_parser.atEOF(); _parser.atEOF();
return -1; return -1;
} }

View File

@ -0,0 +1,337 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpInputInterceptorTest
{
private Server server;
private HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory();
private ServerConnector connector;
private HttpClient client;
private void start(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server, 1, 1, httpConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
client = new HttpClient();
server.addBean(client);
server.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(server);
}
@Test
public void testBlockingReadInterceptorThrows() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
throw new RuntimeException();
});
assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream()));
serverLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(new BytesRequestContent(new byte[1]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@Test
public void testBlockingReadInterceptorConsumesHalfThenThrows() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.setHandled(true);
// Consume some and then throw.
AtomicInteger readCount = new AtomicInteger();
jettyRequest.getHttpInput().addInterceptor(content ->
{
int reads = readCount.incrementAndGet();
if (reads == 1)
{
ByteBuffer buffer = content.getByteBuffer();
int half = buffer.remaining() / 2;
int limit = buffer.limit();
buffer.limit(buffer.position() + half);
ByteBuffer chunk = buffer.slice();
buffer.position(buffer.limit());
buffer.limit(limit);
return new HttpInput.Content(chunk);
}
throw new RuntimeException();
});
assertThrows(IOException.class, () -> IO.readBytes(request.getInputStream()));
serverLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(new BytesRequestContent(new byte[1024]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@Test
public void testAvailableReadInterceptorThrows() throws Exception
{
CountDownLatch interceptorLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
interceptorLatch.countDown();
throw new RuntimeException();
});
int available = request.getInputStream().available();
assertEquals(0, available);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(new BytesRequestContent(new byte[1]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@Test
public void testIsReadyReadInterceptorThrows() throws Exception
{
AsyncRequestContent asyncRequestContent = new AsyncRequestContent(ByteBuffer.wrap(new byte[1]));
CountDownLatch interceptorLatch = new CountDownLatch(1);
CountDownLatch readFailureLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
AtomicBoolean onDataAvailable = new AtomicBoolean();
jettyRequest.getHttpInput().addInterceptor(content ->
{
if (onDataAvailable.get())
{
interceptorLatch.countDown();
throw new RuntimeException();
}
else
{
return content;
}
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
onDataAvailable.set(true);
// The input.setReadListener() call called the interceptor so there is content for read().
assertThat(input.isReady(), is(true));
assertDoesNotThrow(() -> assertEquals(0, input.read()));
// Make the client send more content so that the interceptor will be called again.
asyncRequestContent.offer(ByteBuffer.wrap(new byte[1]));
asyncRequestContent.close();
sleep(500); // Wait a little to make sure the content arrived by next isReady() call.
// The interceptor should throw, but isReady() should not.
assertThat(input.isReady(), is(true));
assertThrows(IOException.class, () -> assertEquals(0, input.read()));
readFailureLatch.countDown();
response.setStatus(HttpStatus.NO_CONTENT_204);
asyncContext.complete();
}
@Override
public void onAllDataRead()
{
}
@Override
public void onError(Throwable error)
{
error.printStackTrace();
}
});
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(asyncRequestContent)
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertTrue(readFailureLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
@Test
public void testSetReadListenerReadInterceptorThrows() throws Exception
{
RuntimeException failure = new RuntimeException();
CountDownLatch interceptorLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
// Throw immediately from the interceptor.
jettyRequest.getHttpInput().addInterceptor(content ->
{
interceptorLatch.countDown();
failure.addSuppressed(new Throwable());
throw failure;
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
}
@Override
public void onAllDataRead()
{
}
@Override
public void onError(Throwable error)
{
assertSame(failure, error.getCause());
response.setStatus(HttpStatus.NO_CONTENT_204);
asyncContext.complete();
}
});
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(new BytesRequestContent(new byte[1]))
.timeout(5, TimeUnit.SECONDS)
.send();
assertTrue(interceptorLatch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
}
private static void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
}