WIP updates from review

Remove delayed dispatch, but keep other improvements
This commit is contained in:
gregw 2024-09-19 09:03:27 +10:00
parent cac0d62f51
commit a585cfcb23
9 changed files with 200 additions and 168 deletions

View File

@ -52,9 +52,12 @@ public abstract class AbstractConnection implements Connection, Invocable
_readCallback = new ReadCallback(); _readCallback = new ReadCallback();
} }
@Deprecated
@Override @Override
public InvocationType getInvocationType() public InvocationType getInvocationType()
{ {
// TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback
// so a connection need not be Invocable
return Invocable.super.getInvocationType(); return Invocable.super.getInvocationType();
} }

View File

@ -17,6 +17,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
@ -25,6 +26,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.MultiPartConfig; import org.eclipse.jetty.http.MultiPartConfig;
import org.eclipse.jetty.http.MultiPartFormData; import org.eclipse.jetty.http.MultiPartFormData;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.FormFields; import org.eclipse.jetty.server.FormFields;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
@ -102,7 +104,7 @@ public class DelayedHandler extends Handler.Wrapper
// if no mimeType, then no delay // if no mimeType, then no delay
if (mimeType == null) if (mimeType == null)
return null; return new UntilContentDelayedProcess(handler, request, response, callback);
// Otherwise, delay until a known content type is fully read; or if the type is not known then until the content is available // Otherwise, delay until a known content type is fully read; or if the type is not known then until the content is available
return switch (mimeType) return switch (mimeType)
@ -120,7 +122,7 @@ public class DelayedHandler extends Handler.Wrapper
yield new UntilMultipartDelayedProcess(handler, request, response, callback, contentType, config); yield new UntilMultipartDelayedProcess(handler, request, response, callback, contentType, config);
} }
default -> null; default -> new UntilContentDelayedProcess(handler, request, response, callback);
}; };
} }
@ -175,6 +177,64 @@ public class DelayedHandler extends Handler.Wrapper
protected abstract void delay() throws Exception; protected abstract void delay() throws Exception;
} }
protected static class UntilContentDelayedProcess extends DelayedProcess
{
public UntilContentDelayedProcess(Handler handler, Request request, Response response, Callback callback)
{
super(handler, request, response, callback);
}
@Override
protected void delay()
{
Content.Chunk chunk = super.getRequest().read();
if (chunk == null)
{
getRequest().demand(this::onContent);
}
else
{
RewindChunkRequest request = new RewindChunkRequest(getRequest(), chunk);
try
{
getHandler().handle(request, getResponse(), getCallback());
}
catch (Throwable x)
{
// Use the wrapped request so that the error handling can
// consume the request content and release the already read chunk.
Response.writeError(request, getResponse(), getCallback(), x);
}
}
}
public void onContent()
{
// We must execute here, because demand callbacks are serialized and process may block on a demand callback
getRequest().getContext().execute(this::process);
}
private static class RewindChunkRequest extends Request.Wrapper
{
private final AtomicReference<Content.Chunk> _chunk;
public RewindChunkRequest(Request wrapped, Content.Chunk chunk)
{
super(wrapped);
_chunk = new AtomicReference<>(chunk);
}
@Override
public Content.Chunk read()
{
Content.Chunk chunk = _chunk.getAndSet(null);
if (chunk != null)
return chunk;
return super.read();
}
}
}
protected static class UntilFormDelayedProcess extends DelayedProcess protected static class UntilFormDelayedProcess extends DelayedProcess
{ {
private final Charset _charset; private final Charset _charset;

View File

@ -709,23 +709,18 @@ public class HttpChannelState implements HttpChannel, Components
@Override @Override
public void succeeded() public void succeeded()
{ {
HttpStream stream = null; HttpStream stream;
boolean completeStream;
try (AutoLock ignored = _lock.lock()) try (AutoLock ignored = _lock.lock())
{ {
assert _callbackCompleted; assert _callbackCompleted;
assert _callbackFailure == null;
_streamSendState = StreamSendState.LAST_COMPLETE; _streamSendState = StreamSendState.LAST_COMPLETE;
if (_handling == null) completeStream = _handling == null;
{
stream = _stream; stream = _stream;
_stream = null;
// TODO remove this before merging
if (_callbackFailure != null)
throw new IllegalStateException("failure in succeeded", _callbackFailure);
}
} }
if (stream != null) if (completeStream)
completeStream(stream, null); completeStream(stream, null);
} }

View File

@ -104,7 +104,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private volatile RetainableByteBuffer _requestBuffer; private volatile RetainableByteBuffer _requestBuffer;
private HttpFields.Mutable _trailers; private HttpFields.Mutable _trailers;
private Runnable _onRequest; private Runnable _onRequest;
private boolean _delayedForContent;
private long _requests; private long _requests;
private long _responses; private long _responses;
private long _bytesIn; private long _bytesIn;
@ -536,13 +535,9 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
LOG.debug("filled {} {} {}", filled, _requestBuffer, this); LOG.debug("filled {} {} {}", filled, _requestBuffer, this);
if (filled > 0) if (filled > 0)
{
_bytesIn += filled; _bytesIn += filled;
}
else if (filled < 0) else if (filled < 0)
{
_parser.atEOF(); _parser.atEOF();
}
return filled; return filled;
} }
@ -598,30 +593,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
{ {
if (_httpChannel.getRequest() == null) if (_httpChannel.getRequest() == null)
return true; return true;
Runnable task = _httpChannel.onIdleTimeout(timeout);
Runnable task;
if (!_delayedForContent || _onRequest == null)
{
task = _httpChannel.onIdleTimeout(timeout);
}
else
{
Runnable onRequest = _onRequest;
_onRequest = null;
task = () ->
{
try
{
onRequest.run();
}
finally
{
_handling.set(false);
}
};
}
if (task != null) if (task != null)
getExecutor().execute(task); getExecutor().execute(task);
return false; // We've handle (or ignored) the timeout return false; // We've handle (or ignored) the timeout
@ -967,7 +939,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
throw new IllegalStateException("Stream pending"); throw new IllegalStateException("Stream pending");
_headerBuilder.clear(); _headerBuilder.clear();
_httpChannel.setHttpStream(stream); _httpChannel.setHttpStream(stream);
_delayedForContent = false;
} }
@Override @Override
@ -979,23 +950,8 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
@Override @Override
public boolean headerComplete() public boolean headerComplete()
{ {
HttpStreamOverHTTP1 stream = _stream.get(); _onRequest = _stream.get().headerComplete();
_onRequest = stream.headerComplete(); return true;
// Should we delay dispatch until we have some content?
if (getHttpConfiguration().isDelayDispatchUntilContent() &&
getEndPoint().getIdleTimeout() > 0 &&
(_parser.getContentLength() > 0 || _parser.isChunking()) &&
!stream._expects100Continue &&
!stream.isCommitted() &&
_requestBuffer != null && _requestBuffer.isEmpty())
{
// TODO should we max this to 1s?
getEndPoint().setIdleTimeout(getEndPoint().getIdleTimeout() / 2);
_delayedForContent = true;
}
return !_delayedForContent;
} }
@Override @Override
@ -1010,22 +966,15 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
_requestBuffer.retain(); _requestBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, _requestBuffer); stream._chunk = Content.Chunk.asChunk(buffer, false, _requestBuffer);
if (_delayedForContent)
{
_delayedForContent = false;
getEndPoint().setIdleTimeout(getEndPoint().getIdleTimeout() * 2);
}
return true; return true;
} }
@Override @Override
public boolean contentComplete() public boolean contentComplete()
{ {
// Do nothing at this point unless we delayed for content // Do nothing at this point.
// Wait for messageComplete so any trailers can be sent as special content // Wait for messageComplete so any trailers can be sent as special content
boolean delayed = _delayedForContent; return false;
_delayedForContent = false;
return delayed;
} }
@Override @Override

View File

@ -35,7 +35,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
@ -51,7 +50,6 @@ import org.eclipse.jetty.server.internal.HttpConnection;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.statistic.CounterStatistic;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -1127,86 +1125,6 @@ public class HttpConnectionTest
} }
} }
/**
* Ensure that excessively large hexadecimal chunk body length is parsed properly.
*/
@Test
public void testDelayedDispatch() throws Exception
{
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(true);
_server.start();
try (LocalConnector.LocalEndPoint connection = _connector.connect())
{
CounterStatistic dumpCounter = _server.getBean(DumpHandler.class).getHandledCounter();
// Dispatch with content
connection.addInput("""
POST /test HTTP/1.1\r
Host: localhost\r
Content-Length: 5\r
Content-Type: text/plain; charset=utf-8\r
\r
12345
"""
);
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getTotal() == 1L);
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getCurrent() == 0L);
String raw = connection.getResponse();
assertThat(raw, containsString("200 OK"));
// Dispatch delayed for content
dumpCounter.reset();
connection.addInput("""
POST /test HTTP/1.1\r
Host: localhost\r
Content-Length: 5\r
Content-Type: text/plain; charset=utf-8\r
\r
"""
);
Thread.sleep(10);
assertThat(dumpCounter.getTotal(), is(0L));
assertThat(dumpCounter.getCurrent(), is(0L));
connection.addInput("12345");
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getTotal() == 1L);
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getCurrent() == 0L);
raw = connection.getResponse();
assertThat(raw, containsString("200 OK"));
// Dispatch delayed for chunked content
dumpCounter.reset();
connection.addInput("""
POST /test HTTP/1.1\r
Host: localhost\r
Transfer-Encoding: chunked\r
Content-Type: text/plain; charset=utf-8\r
\r
"""
);
Thread.sleep(10);
assertThat(dumpCounter.getTotal(), is(0L));
assertThat(dumpCounter.getCurrent(), is(0L));
connection.addInput("""
5;\r
12345\r
0;\r
\r
""");
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getTotal() == 1L);
Awaitility.waitAtMost(1, TimeUnit.SECONDS).until(() -> dumpCounter.getCurrent() == 0L);
raw = connection.getResponse();
assertThat(raw, containsString("200 OK"));
}
}
/** /**
* Creates a request header over 1k in size, by creating a single header entry with an huge value. * Creates a request header over 1k in size, by creating a single header entry with an huge value.
* *

View File

@ -47,6 +47,8 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -168,6 +170,124 @@ public class DelayedHandlerTest
} }
} }
@Test
public void testDelayedUntilContent() throws Exception
{
DelayedHandler delayedHandler = new DelayedHandler();
_server.setHandler(delayedHandler);
CountDownLatch processing = new CountDownLatch(1);
delayedHandler.setHandler(new HelloHandler()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
// Check that we are not called via any demand callback
ByteArrayOutputStream out = new ByteArrayOutputStream(8192);
new Throwable().printStackTrace(new PrintStream(out));
String stack = out.toString(StandardCharsets.ISO_8859_1);
assertThat(stack, not(containsString("DemandContentCallback.succeeded")));
assertThat(stack, not(containsString("%s.%s".formatted(
DelayedHandler.UntilContentDelayedProcess.class.getSimpleName(),
DelayedHandler.UntilContentDelayedProcess.class.getMethod("onContent").getName()))));
processing.countDown();
return super.handle(request, response, callback);
}
});
_server.start();
try (Socket socket = new Socket("localhost", _connector.getLocalPort()))
{
String request = """
POST / HTTP/1.1\r
Host: localhost\r
Content-Length: 10\r
\r
""";
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertFalse(processing.await(250, TimeUnit.MILLISECONDS));
output.write("01234567\r\n".getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(processing.await(10, TimeUnit.SECONDS));
HttpTester.Input input = HttpTester.from(socket.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
String content = new String(response.getContentBytes(), StandardCharsets.UTF_8);
assertThat(content, containsString("Hello"));
}
}
@Test
public void testDelayedUntilContentInContext() throws Exception
{
ContextHandler context = new ContextHandler();
_server.setHandler(context);
DelayedHandler delayedHandler = new DelayedHandler();
context.setHandler(delayedHandler);
CountDownLatch processing = new CountDownLatch(1);
delayedHandler.setHandler(new HelloHandler()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
// Check that we are not called via any demand callback
ByteArrayOutputStream out = new ByteArrayOutputStream(8192);
new Throwable().printStackTrace(new PrintStream(out));
String stack = out.toString(StandardCharsets.ISO_8859_1);
assertThat(stack, not(containsString("DemandContentCallback.succeeded")));
assertThat(stack, not(containsString("%s.%s".formatted(
DelayedHandler.UntilContentDelayedProcess.class.getSimpleName(),
DelayedHandler.UntilContentDelayedProcess.class.getMethod("onContent").getName()))));
// Check the thread is in the context
assertThat(ContextHandler.getCurrentContext(), sameInstance(context.getContext()));
// Check the request is wrapped in the context
assertThat(request.getContext(), sameInstance(context.getContext()));
processing.countDown();
return super.handle(request, response, callback);
}
});
_server.start();
try (Socket socket = new Socket("localhost", _connector.getLocalPort()))
{
String request = """
POST / HTTP/1.1\r
Host: localhost\r
Content-Length: 10\r
\r
""";
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertFalse(processing.await(250, TimeUnit.MILLISECONDS));
output.write("01234567\r\n".getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(processing.await(10, TimeUnit.SECONDS));
HttpTester.Input input = HttpTester.from(socket.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
String content = new String(response.getContentBytes(), StandardCharsets.UTF_8);
assertThat(content, containsString("Hello"));
}
}
@Test @Test
public void testNoDelayWithContent() throws Exception public void testNoDelayWithContent() throws Exception
{ {

View File

@ -42,7 +42,6 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class ThreadLimitHandlerTest public class ThreadLimitHandlerTest
@ -281,16 +280,6 @@ public class ThreadLimitHandlerTest
@Override @Override
public void run() public void run()
{ {
// Read the first byte we know is there. This is to get around any delayed dispatch
if (read.get() == 0)
{
Content.Chunk chunk = request.read();
assertThat(chunk, notNullValue());
assertThat(chunk.remaining(), is(1));
read.incrementAndGet();
request.demand(this);
return;
}
count.incrementAndGet(); count.incrementAndGet();
try try
{ {
@ -342,7 +331,7 @@ public class ThreadLimitHandlerTest
for (int i = 0; i < client.length; i++) for (int i = 0; i < client.length; i++)
{ {
client[i] = new Socket("127.0.0.1", _connector.getLocalPort()); client[i] = new Socket("127.0.0.1", _connector.getLocalPort());
client[i].getOutputStream().write(("POST /" + i + " HTTP/1.0\r\nForwarded: for=1.2.3.4\r\nContent-Length: 3\r\n\r\nX").getBytes()); client[i].getOutputStream().write(("POST /" + i + " HTTP/1.0\r\nForwarded: for=1.2.3.4\r\nContent-Length: 2\r\n\r\n").getBytes());
client[i].getOutputStream().flush(); client[i].getOutputStream().flush();
} }
@ -355,7 +344,7 @@ public class ThreadLimitHandlerTest
// Send some content for the clients // Send some content for the clients
for (Socket socket : client) for (Socket socket : client)
{ {
socket.getOutputStream().write('Y'); socket.getOutputStream().write('X');
socket.getOutputStream().flush(); socket.getOutputStream().flush();
} }
@ -375,7 +364,7 @@ public class ThreadLimitHandlerTest
// Send the rest of the content for the clients // Send the rest of the content for the clients
for (Socket socket : client) for (Socket socket : client)
{ {
socket.getOutputStream().write('Z'); socket.getOutputStream().write('Y');
socket.getOutputStream().flush(); socket.getOutputStream().flush();
} }
@ -384,7 +373,7 @@ public class ThreadLimitHandlerTest
{ {
response = IO.toString(socket.getInputStream()); response = IO.toString(socket.getInputStream());
assertThat(response, containsString(" 200 OK")); assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString(" read 3")); assertThat(response, containsString(" read 2"));
} }
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0)); await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));

View File

@ -137,7 +137,6 @@ public class RequestReaderTest extends AbstractTest
} }
}); });
server.start(); server.start();
connector.setIdleTimeout(1000);
AtomicReference<Result> resultRef = new AtomicReference<>(); AtomicReference<Result> resultRef = new AtomicReference<>();
try (AsyncRequestContent content = new AsyncRequestContent()) try (AsyncRequestContent content = new AsyncRequestContent())

View File

@ -137,7 +137,6 @@ public class RequestReaderTest extends AbstractTest
} }
}); });
server.start(); server.start();
connector.setIdleTimeout(1000);
AtomicReference<Result> resultRef = new AtomicReference<>(); AtomicReference<Result> resultRef = new AtomicReference<>();
try (AsyncRequestContent content = new AsyncRequestContent()) try (AsyncRequestContent content = new AsyncRequestContent())