Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.1.x

# Conflicts:
#	jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
This commit is contained in:
gregw 2024-09-06 14:18:20 +10:00
commit fda0408e38
7 changed files with 222 additions and 186 deletions

11
Jenkinsfile vendored
View File

@ -39,6 +39,17 @@ pipeline {
} }
} }
stage("Build / Test - JDK23") {
agent { node { label 'linux' } }
steps {
timeout( time: 180, unit: 'MINUTES' ) {
checkout scm
mavenBuild( "jdk23", "clean install -Dspotbugs.skip=true -Djacoco.skip=true", "maven3")
recordIssues id: "jdk23", name: "Static Analysis jdk23", aggregatingResults: true, enabledForFailure: true, tools: [mavenConsole(), java(), checkStyle(), javaDoc()]
}
}
}
stage("Build / Test - JDK17") { stage("Build / Test - JDK17") {
agent { node { label 'linux' } } agent { node { label 'linux' } }
steps { steps {

View File

@ -699,7 +699,7 @@ public interface HttpFields extends Iterable<HttpField>, Supplier<HttpFields>
* @return the value of the field as a {@code long}, * @return the value of the field as a {@code long},
* or -1 if no such field is present * or -1 if no such field is present
* @throws NumberFormatException if the value of the field * @throws NumberFormatException if the value of the field
* cannot be converted to a {@link long} * cannot be converted to a {@code long}
*/ */
default long getLongField(String name) throws NumberFormatException default long getLongField(String name) throws NumberFormatException
{ {
@ -715,7 +715,7 @@ public interface HttpFields extends Iterable<HttpField>, Supplier<HttpFields>
* @return the value of the field as a {@code long}, * @return the value of the field as a {@code long},
* or -1 if no such field is present * or -1 if no such field is present
* @throws NumberFormatException if the value of the field * @throws NumberFormatException if the value of the field
* cannot be converted to a {@link long} * cannot be converted to a {@code long}
*/ */
default long getLongField(HttpHeader header) throws NumberFormatException default long getLongField(HttpHeader header) throws NumberFormatException
{ {

View File

@ -482,7 +482,8 @@ public class HttpChannelState implements HttpChannel, Components
} }
} }
// Consume content as soon as possible to open any flow control window. // Consume content as soon as possible to open any
// flow control window and release any request buffer.
Throwable unconsumed = stream.consumeAvailable(); Throwable unconsumed = stream.consumeAvailable();
if (unconsumed != null && LOG.isDebugEnabled()) if (unconsumed != null && LOG.isDebugEnabled())
LOG.debug("consuming content during error {}", unconsumed.toString()); LOG.debug("consuming content during error {}", unconsumed.toString());

View File

@ -19,6 +19,7 @@ import java.nio.channels.WritePendingException;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -103,7 +104,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private final LongAdder bytesOut = new LongAdder(); private final LongAdder bytesOut = new LongAdder();
private final AtomicBoolean _handling = new AtomicBoolean(false); private final AtomicBoolean _handling = new AtomicBoolean(false);
private final HttpFields.Mutable _headerBuilder = HttpFields.build(); private final HttpFields.Mutable _headerBuilder = HttpFields.build();
private volatile RetainableByteBuffer _retainableByteBuffer; private volatile RetainableByteBuffer _requestBuffer;
private HttpFields.Mutable _trailers; private HttpFields.Mutable _trailers;
private Runnable _onRequest; private Runnable _onRequest;
private long _requests; private long _requests;
@ -319,21 +320,18 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
public ByteBuffer onUpgradeFrom() public ByteBuffer onUpgradeFrom()
{ {
if (isRequestBufferEmpty()) if (isRequestBufferEmpty())
{
releaseRequestBuffer();
return null; return null;
} ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining());
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); unconsumed.put(_requestBuffer.getByteBuffer());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip(); unconsumed.flip();
releaseRequestBuffer();
return unconsumed; return unconsumed;
} }
@Override @Override
public void onUpgradeTo(ByteBuffer buffer) public void onUpgradeTo(ByteBuffer buffer)
{ {
BufferUtil.append(getRequestBuffer(), buffer); ensureRequestBuffer();
BufferUtil.append(_requestBuffer.getByteBuffer(), buffer);
} }
@Override @Override
@ -345,51 +343,49 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
void releaseRequestBuffer() void releaseRequestBuffer()
{ {
if (_retainableByteBuffer != null && _retainableByteBuffer.isEmpty()) if (LOG.isDebugEnabled())
{ LOG.debug("releasing request buffer {} {}", _requestBuffer, this);
if (LOG.isDebugEnabled()) _requestBuffer.release();
LOG.debug("releaseRequestBuffer {}", this); _requestBuffer = null;
RetainableByteBuffer buffer = _retainableByteBuffer;
_retainableByteBuffer = null;
if (!buffer.release())
throw new IllegalStateException("unreleased buffer " + buffer);
}
} }
private ByteBuffer getRequestBuffer() private void ensureRequestBuffer()
{ {
if (_retainableByteBuffer == null) if (_requestBuffer == null)
_retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); {
return _retainableByteBuffer.getByteBuffer(); _requestBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("request buffer acquired {} {}", _requestBuffer, this);
}
} }
public boolean isRequestBufferEmpty() public boolean isRequestBufferEmpty()
{ {
return _retainableByteBuffer == null || _retainableByteBuffer.isEmpty(); return _requestBuffer == null || !_requestBuffer.hasRemaining();
} }
@Override @Override
public void onFillable() public void onFillable()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(">>onFillable enter {} {} {}", this, _httpChannel, _retainableByteBuffer); LOG.debug("onFillable enter {} {} {}", _httpChannel, _requestBuffer, this);
HttpConnection last = setCurrentConnection(this); HttpConnection last = setCurrentConnection(this);
try try
{ {
ensureRequestBuffer();
// We must loop until we fill -1 or there is an async pause in handling. // We must loop until we fill -1 or there is an async pause in handling.
// Note that the endpoint might already be closed in some special circumstances. // Note that the endpoint might already be closed in some special circumstances.
while (true) while (true)
{ {
// Fill the request buffer (if needed).
int filled = fillRequestBuffer(); int filled = fillRequestBuffer();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onFillable filled {} {} {} {}", filled, this, _httpChannel, _retainableByteBuffer); LOG.debug("onFillable filled {} {} {} {}", filled, _httpChannel, _requestBuffer, this);
if (filled < 0 && getEndPoint().isOutputShutdown()) if (filled < 0 && getEndPoint().isOutputShutdown())
close(); close();
// Parse the request buffer.
boolean handle = parseRequestBuffer(); boolean handle = parseRequestBuffer();
// There could be a connection upgrade before handling // There could be a connection upgrade before handling
@ -397,52 +393,66 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
// If there was a connection upgrade, the other // If there was a connection upgrade, the other
// connection took over, nothing more to do here. // connection took over, nothing more to do here.
if (getEndPoint().getConnection() != this) if (getEndPoint().getConnection() != this)
{
releaseRequestBuffer();
break; break;
}
// Handle channel event. This will only be true when the headers of a request have been received. // The headers of a request have been received.
if (handle) if (handle)
{ {
Request request = _httpChannel.getRequest(); Request request = _httpChannel.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HANDLE {} {}", request, this); LOG.debug("HANDLE {} {}", request, this);
// handle the request by running the task obtained from onRequest // Handle the request by running the task.
_handling.set(true); _handling.set(true);
Runnable onRequest = _onRequest; Runnable onRequest = _onRequest;
_onRequest = null; _onRequest = null;
onRequest.run(); onRequest.run();
// If the _handling boolean has already been CaS'd to false, then stream is completed and we are no longer // If the CaS succeeds, then some thread is still handling the request.
// handling, so the caller can continue to fill and parse more connections. If it is still true, then some // If the CaS fails, then stream is completed, we are no longer handling,
// thread is still handling the request and they will need to organize more filling and parsing once complete. // so the caller can continue to fill and parse more connections.
if (_handling.compareAndSet(true, false)) if (_handling.compareAndSet(true, false))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("request !complete {} {}", request, this); LOG.debug("request !complete {} {} {}", request, _requestBuffer, this);
// Cannot release the request buffer here, because the
// application may read concurrently from another thread.
// The request buffer will be released by the application
// reading the request content, or by the implementation
// trying to consume the request content.
break; break;
} }
// If the request is complete, but has been upgraded, then break // If there was an upgrade, release and return.
if (getEndPoint().getConnection() != this) if (getEndPoint().getConnection() != this)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("upgraded {} -> {}", this, getEndPoint().getConnection()); LOG.debug("upgraded {} -> {}", this, getEndPoint().getConnection());
releaseRequestBuffer();
break; break;
} }
} }
else if (filled == 0)
{
assert isRequestBufferEmpty();
releaseRequestBuffer();
fillInterested();
break;
}
else if (filled < 0) else if (filled < 0)
{ {
assert isRequestBufferEmpty();
releaseRequestBuffer();
getEndPoint().shutdownOutput(); getEndPoint().shutdownOutput();
break; break;
} }
else if (_requestHandler._failure != null) else if (_requestHandler._failure != null)
{ {
// There was an error, don't fill more. // There was an error, don't fill more.
break; releaseRequestBuffer();
}
else if (filled == 0)
{
fillInterested();
break; break;
} }
} }
@ -453,11 +463,8 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("caught exception {} {}", this, _httpChannel, x); LOG.debug("caught exception {} {}", this, _httpChannel, x);
if (_retainableByteBuffer != null) if (_requestBuffer != null)
{
_retainableByteBuffer.clear();
releaseRequestBuffer(); releaseRequestBuffer();
}
} }
finally finally
{ {
@ -468,7 +475,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
{ {
setCurrentConnection(last); setCurrentConnection(last);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("<<onFillable exit {} {} {}", this, _httpChannel, _retainableByteBuffer); LOG.debug("onFillable exit {} {} {}", _httpChannel, _requestBuffer, this);
} }
} }
@ -478,70 +485,63 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
*/ */
void parseAndFillForContent() void parseAndFillForContent()
{ {
// Defensive check to avoid an infinite select/wakeup/fillAndParseForContent/wait loop ensureRequestBuffer();
// in case the parser was mistakenly closed and the connection was not aborted.
if (_parser.isTerminated())
{
_requestHandler.messageComplete();
return;
}
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().
// This loop was designed by a committee and voted by a majority.
while (_parser.inContentState()) while (_parser.inContentState())
{ {
if (parseRequestBuffer()) if (parseRequestBuffer())
break; break;
// Re-check the parser state after parsing to avoid filling,
// otherwise fillRequestBuffer() would acquire a ByteBuffer if (!_parser.inContentState())
// that may be leaked. {
if (_parser.inContentState() && fillRequestBuffer() <= 0) // The request is complete, and we are going to re-enter onFillable(),
// because either A: the request/response was completed synchronously
// so the onFillable() thread will loop, or B: the request/response
// was completed asynchronously, and the HttpStreamOverHTTP1 dispatches
// a call to onFillable() to process the next request.
// Therefore, there is no need to release the request buffer here,
// also because the buffer may contain pipelined requests.
break; break;
}
assert !_requestBuffer.hasRemaining();
if (_requestBuffer.isRetained())
{
// The application has retained the content chunks,
// reacquire the buffer to avoid overwriting the content.
releaseRequestBuffer();
ensureRequestBuffer();
}
int filled = fillRequestBuffer();
if (filled <= 0)
{
releaseRequestBuffer();
break;
}
} }
} }
private int fillRequestBuffer() private int fillRequestBuffer()
{ {
if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained())
{
// TODO this is almost certainly wrong
RetainableByteBuffer newBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("replace buffer {} <- {} in {}", _retainableByteBuffer, newBuffer, this);
_retainableByteBuffer.release();
_retainableByteBuffer = newBuffer;
}
if (!isRequestBufferEmpty()) if (!isRequestBufferEmpty())
return _retainableByteBuffer.remaining(); return _requestBuffer.remaining();
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not any possible legal threads calling #parseContent or #completed.
ByteBuffer requestBuffer = getRequestBuffer();
// fill
try try
{ {
ByteBuffer requestBuffer = _requestBuffer.getByteBuffer();
int filled = getEndPoint().fill(requestBuffer); int filled = getEndPoint().fill(requestBuffer);
if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections) if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(requestBuffer); filled = getEndPoint().fill(requestBuffer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer); LOG.debug("filled {} {} {}", filled, _requestBuffer, this);
if (filled > 0) if (filled > 0)
{
bytesIn.add(filled); bytesIn.add(filled);
} else if (filled < 0)
else _parser.atEOF();
{
if (filled < 0)
_parser.atEOF();
releaseRequestBuffer();
}
return filled; return filled;
} }
@ -550,11 +550,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), x); LOG.debug("Unable to fill from endpoint {}", getEndPoint(), x);
_parser.atEOF(); _parser.atEOF();
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.clear();
releaseRequestBuffer();
}
return -1; return -1;
} }
} }
@ -562,19 +557,15 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private boolean parseRequestBuffer() private boolean parseRequestBuffer()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} parse {}", this, _retainableByteBuffer); LOG.debug("parse {} {}", _requestBuffer, this);
if (_parser.isTerminated()) if (_parser.isTerminated())
throw new RuntimeIOException("Parser is terminated"); throw new RuntimeIOException("Parser is terminated");
boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getByteBuffer()); boolean handle = _parser.parseNext(_requestBuffer.getByteBuffer());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser); LOG.debug("parsed {} {} {} {}", handle, _parser, _requestBuffer, this);
// recycle buffer ?
if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained())
releaseRequestBuffer();
return handle; return handle;
} }
@ -631,19 +622,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
getExecutor().execute(this); getExecutor().execute(this);
} }
@Override
public void onClose(Throwable cause)
{
// TODO: do we really need to do this?
// This event is fired really late, sendCallback should already be failed at this point.
// Revisit whether we still need IteratingCallback.close().
if (cause == null)
_sendCallback.close();
else
_sendCallback.abort(cause);
super.onClose(cause);
}
@Override @Override
public void run() public void run()
{ {
@ -989,14 +967,14 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
public boolean content(ByteBuffer buffer) public boolean content(ByteBuffer buffer)
{ {
HttpStreamOverHTTP1 stream = _stream.get(); HttpStreamOverHTTP1 stream = _stream.get();
if (stream == null || stream._chunk != null || _retainableByteBuffer == null) if (stream == null || stream._chunk != null || _requestBuffer == null)
throw new IllegalStateException(); throw new IllegalStateException();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this); LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _requestBuffer, HttpConnection.this);
_retainableByteBuffer.retain(); _requestBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, _retainableByteBuffer); stream._chunk = Content.Chunk.asChunk(buffer, false, _requestBuffer);
return true; return true;
} }
@ -1540,6 +1518,11 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("abort due to pending read {} {} ", this, getEndPoint()); LOG.debug("abort due to pending read {} {} ", this, getEndPoint());
abort(new IOException("Pending read in onCompleted")); abort(new IOException("Pending read in onCompleted"));
_httpChannel.recycle();
_parser.reset();
_generator.reset();
if (!_handling.compareAndSet(true, false))
resume();
return; return;
} }
@ -1549,6 +1532,8 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
_httpChannel.recycle(); _httpChannel.recycle();
_parser.close(); _parser.close();
_generator.reset(); _generator.reset();
if (!_handling.compareAndSet(true, false))
releaseRequestBuffer();
return; return;
} }
@ -1580,40 +1565,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("non-current completion {}", this); LOG.debug("non-current completion {}", this);
// If we are looking for the next request resume();
if (_parser.isStart())
{
// if the buffer is empty
if (isRequestBufferEmpty())
{
// look for more data
fillInterested();
}
// else if we are still running
else if (getConnector().isRunning())
{
// Dispatched to handle a pipelined request
try
{
getExecutor().execute(HttpConnection.this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isRunning())
LOG.warn("Failed dispatch of {}", this, e);
else
LOG.trace("IGNORED", e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
// else the parser must be closed, so seek the EOF if we are still open
else if (getEndPoint().isOpen())
fillInterested();
} }
@Override @Override
@ -1629,6 +1581,30 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("aborting", x); LOG.debug("aborting", x);
abort(x); abort(x);
_httpChannel.recycle();
_parser.reset();
_generator.reset();
if (!_handling.compareAndSet(true, false))
resume();
}
private void resume()
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Resuming onFillable() {}", HttpConnection.this);
// Dispatch to handle pipelined requests.
Request request = _httpChannel.getRequest();
Executor executor = request == null ? getExecutor() : request.getComponents().getExecutor();
executor.execute(HttpConnection.this);
}
catch (RejectedExecutionException x)
{
getEndPoint().close(x);
// Resume by running, to release the request buffer.
run();
}
} }
private void abort(Throwable failure) private void abort(Throwable failure)

View File

@ -60,7 +60,7 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,7 +68,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -1869,8 +1868,8 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {false /* TODO, true */}) @CsvSource({"false,false", "false,true", "true,false", "true,true"})
public void testHoldContent(boolean close) throws Exception public void testHoldContent(boolean close, boolean pipeline) throws Exception
{ {
Queue<Content.Chunk> contents = new ConcurrentLinkedQueue<>(); Queue<Content.Chunk> contents = new ConcurrentLinkedQueue<>();
final int bufferSize = 1024; final int bufferSize = 1024;
@ -1881,6 +1880,15 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
@Override @Override
public boolean handle(Request request, Response response, Callback callback) throws Exception public boolean handle(Request request, Response response, Callback callback) throws Exception
{ {
if (LOG.isDebugEnabled())
LOG.debug("Handling request: {}", request);
if ("GET".equals(request.getMethod()))
{
response.setStatus(200);
callback.succeeded();
return true;
}
request.getConnectionMetaData().getConnection().addEventListener(new Connection.Listener() request.getConnectionMetaData().getConnection().addEventListener(new Connection.Listener()
{ {
@Override @Override
@ -1889,10 +1897,12 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
closed.countDown(); closed.countDown();
} }
}); });
while (true) while (true)
{ {
Content.Chunk chunk = request.read(); Content.Chunk chunk = request.read();
if (LOG.isDebugEnabled())
LOG.debug("read: {}", chunk);
if (chunk == null) if (chunk == null)
{ {
try (Blocker.Runnable blocker = Blocker.runnable()) try (Blocker.Runnable blocker = Blocker.runnable())
@ -1902,81 +1912,111 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
continue; continue;
} }
} }
if (chunk.hasRemaining()) if (chunk.hasRemaining())
contents.add(chunk); contents.add(chunk);
else else
chunk.release(); chunk.release();
if (chunk.isLast()) if (chunk.isLast())
break; break;
} }
response.setStatus(200); response.setStatus(200);
if (close) if (close)
{
LOG.info("Closing {}", request.getConnectionMetaData().getConnection().getEndPoint());
request.getConnectionMetaData().getConnection().getEndPoint().close(); request.getConnectionMetaData().getConnection().getEndPoint().close();
}
callback.succeeded(); callback.succeeded();
return true; return true;
} }
}); });
byte[] chunk = new byte[bufferSize / 2]; byte[] chunk = new byte[bufferSize / 2];
Arrays.fill(chunk, (byte)'X'); Arrays.fill(chunk, (byte)'X');
try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{ {
OutputStream os = client.getOutputStream(); OutputStream os = client.getOutputStream();
BufferedOutputStream out = new BufferedOutputStream(os, bufferSize); BufferedOutputStream out = new BufferedOutputStream(os, bufferSize);
out.write((""" String request = """
POST / HTTP/1.1\r POST / HTTP/1.1\r
Host: localhost\r Host: localhost\r
Connection: close\r Connection: %s\r
Transfer-Encoding: chunked\r Transfer-Encoding: chunked\r
\r \r
""").getBytes(StandardCharsets.ISO_8859_1)); """.formatted(pipeline ? "other" : "close");
if (LOG.isDebugEnabled())
LOG.debug("raw request {}", request);
out.write(request.getBytes(StandardCharsets.ISO_8859_1));
// single chunk // single chunk
out.write((Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); out.write((Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1));
out.write(chunk); out.write(chunk);
out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1)); out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1));
out.flush(); out.flush();
// double chunk (will overflow bufferSize)
// double chunk (will overflow)
out.write((Integer.toHexString(chunk.length * 2) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); out.write((Integer.toHexString(chunk.length * 2) + "\r\n").getBytes(StandardCharsets.ISO_8859_1));
out.write(chunk); out.write(chunk);
out.write(chunk); out.write(chunk);
out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1)); out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1));
out.flush(); out.flush();
// single chunk and end chunk // single chunk and end chunk plus optional pipelined request
out.write((Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); ByteBuffer last = BufferUtil.allocate(4096);
out.write(chunk); BufferUtil.append(last, (Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1));
out.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); BufferUtil.append(last, chunk);
BufferUtil.append(last, "\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
if (pipeline)
BufferUtil.append(last, "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
if (LOG.isDebugEnabled())
LOG.debug("last {}", BufferUtil.toString(last));
out.write(BufferUtil.toArray(last));
out.flush(); out.flush();
// check the response // check the response
if (!close) if (close)
{
assertThat(client.getInputStream().read(), equalTo(-1));
}
else
{ {
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response); assertNotNull(response);
assertThat(response.getStatus(), is(200)); assertThat(response.getStatus(), is(200));
if (pipeline)
{
response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
}
} }
} }
assertTrue(closed.await(10, TimeUnit.SECONDS)); assertTrue(closed.await(10, TimeUnit.SECONDS));
long total = contents.stream().mapToLong(Content.Chunk::remaining).sum(); long total = contents.stream().mapToLong(Content.Chunk::remaining).sum();
assertThat(total, equalTo(chunk.length * 4L)); assertThat(total, equalTo(chunk.length * 4L));
ByteBufferPool rbbp = _connector.getByteBufferPool(); ByteBufferPool rbbp = _connector.getByteBufferPool();
if (rbbp instanceof ArrayByteBufferPool pool) if (rbbp instanceof ArrayByteBufferPool pool)
{ {
long buffersBeforeRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount(); long buffersBeforeRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount();
if (LOG.isDebugEnabled())
{
LOG.debug("pool {}", pool);
contents.stream().map(Content.Chunk::toString).forEach(LOG::debug);
}
contents.forEach(Content.Chunk::release); contents.forEach(Content.Chunk::release);
long buffersAfterRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount();
assertThat(buffersAfterRelease, greaterThan(buffersBeforeRelease)); Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() ->
{
if (LOG.isDebugEnabled())
{
LOG.debug("pool {}", pool);
contents.stream().map(Content.Chunk::toString).forEach(LOG::debug);
}
long buffersAfterRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount();
return buffersAfterRelease > buffersBeforeRelease;
});
assertThat(pool.getAvailableDirectMemory() + pool.getAvailableHeapMemory(), greaterThanOrEqualTo(chunk.length * 4L)); assertThat(pool.getAvailableDirectMemory() + pool.getAvailableHeapMemory(), greaterThanOrEqualTo(chunk.length * 4L));
} }
else else

View File

@ -191,11 +191,13 @@ public class RequestListenersTest
Content-Length: 1 Content-Length: 1
Connection: close Connection: close
""", 2000 * idleTimeout, TimeUnit.MILLISECONDS)); """, 2 * idleTimeout, TimeUnit.MILLISECONDS));
int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500;
assertEquals(expectedStatus, response.getStatus()); assertEquals(expectedStatus, response.getStatus());
assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(failIdleTimeout && !succeedCallback)); // The failure listener is never invoked because completing the callback
// produces a response that completes the stream so the failure is ignored.
assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(false));
} }
@ParameterizedTest @ParameterizedTest

View File

@ -15,6 +15,8 @@ package org.eclipse.jetty.test.client.transport;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.StringRequestContent; import org.eclipse.jetty.client.StringRequestContent;
@ -26,6 +28,8 @@ import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -38,17 +42,17 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
public void testClientIdleTimeout(Transport transport) throws Exception public void testClientIdleTimeout(Transport transport) throws Exception
{ {
long serverIdleTimeout = idleTimeout * 2; long serverIdleTimeout = idleTimeout * 2;
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); AtomicReference<Callback> serverCallbackRef = new AtomicReference<>();
start(transport, new Handler.Abstract() start(transport, new Handler.Abstract()
{ {
@Override @Override
public boolean handle(Request request, Response response, Callback callback) public boolean handle(Request request, Response response, Callback callback)
{ {
// Do not succeed the callback if it's a timeout request. // Do not succeed the callback if it's a timeout request.
if (!Request.getPathInContext(request).equals("/timeout")) if (Request.getPathInContext(request).equals("/timeout"))
callback.succeeded(); request.addFailureListener(x -> serverCallbackRef.set(callback));
else else
request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); callback.succeeded();
return true; return true;
} }
}); });
@ -75,7 +79,8 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
assertEquals(HttpStatus.OK_200, response.getStatus()); assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources.
assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); Callback callback = await().atMost(2 * serverIdleTimeout, TimeUnit.MILLISECONDS).until(serverCallbackRef::get, notNullValue());
callback.failed(new TimeoutException());
} }
@ParameterizedTest @ParameterizedTest
@ -83,17 +88,17 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
public void testRequestIdleTimeout(Transport transport) throws Exception public void testRequestIdleTimeout(Transport transport) throws Exception
{ {
long serverIdleTimeout = idleTimeout * 2; long serverIdleTimeout = idleTimeout * 2;
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); AtomicReference<Callback> serverCallbackRef = new AtomicReference<>();
start(transport, new Handler.Abstract() start(transport, new Handler.Abstract()
{ {
@Override @Override
public boolean handle(Request request, Response response, Callback callback) throws Exception public boolean handle(Request request, Response response, Callback callback) throws Exception
{ {
// Do not succeed the callback if it's a timeout request. // Do not succeed the callback if it's a timeout request.
if (!Request.getPathInContext(request).equals("/timeout")) if (Request.getPathInContext(request).equals("/timeout"))
callback.succeeded(); request.addFailureListener(x -> serverCallbackRef.set(callback));
else else
request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); callback.succeeded();
return true; return true;
} }
}); });
@ -120,7 +125,8 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
assertEquals(HttpStatus.OK_200, response.getStatus()); assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources.
assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); Callback callback = await().atMost(2 * serverIdleTimeout, TimeUnit.MILLISECONDS).until(serverCallbackRef::get, notNullValue());
callback.failed(new TimeoutException());
} }
@ParameterizedTest @ParameterizedTest