Merged branch 'jetty-10.0.x' into 'jetty-11.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-09-15 10:20:47 +02:00
commit 8dc823cd86
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
7 changed files with 115 additions and 187 deletions

View File

@ -27,10 +27,13 @@ This release process will produce releases:
- [ ] Verify target [project(s)](https://github.com/eclipse/jetty.project/projects) are complete.
- [ ] Verify that branch `jetty-10.0.x` is merged to branch `jetty-11.0.x`.
- [ ] Assign issue to "build manager", who will stage the releases.
+ [ ] Create and use branches `release/<ver>` to perform version specific release work from.
+ [ ] Ensure `VERSION.txt` additions for each release will be meaningful, descriptive, correct text.
+ [ ] Stage 9.4 release with Java 11.
+ [ ] Stage 10 release with Java 17.
+ [ ] Stage 11 release with Java 17.
+ [ ] Push release branches `release/<ver>` to to https://github.com/eclipse/jetty.project
+ [ ] Push release tags `jetty-<ver>` to https://github.com/eclipse/jetty.project
+ [ ] Edit a draft release (for each Jetty release) in GitHub (https://github.com/eclipse/jetty.project/releases). Content is generated with the "changelog tool".
- [ ] Assign issue to "test manager", who will oversee the testing of the staged releases.
+ [ ] Test [CometD](https://github.com/cometd/cometd).

View File

@ -285,6 +285,15 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
return committed;
}
@Override
public int dataSize()
{
try (AutoLock l = lock.lock())
{
return dataQueue == null ? 0 : dataQueue.size();
}
}
public boolean isOpen()
{
return !isClosed();
@ -921,13 +930,14 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
@Override
public String toString()
{
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(),
hashCode(),
getId(),
session.hashCode(),
sendWindow,
recvWindow,
dataSize(),
demand(),
localReset,
remoteReset,

View File

@ -142,6 +142,11 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isCommitted();
/**
* @return the size of the DATA frame queue
*/
int dataSize();
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/

View File

@ -186,7 +186,14 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onBeforeData(Stream stream)
{
// Don't demand here, as the initial demand is controlled by
// the application via DemandedContentListener.onBeforeContent().
}
@Override
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
channel.onData(frame, callback);

View File

@ -14,10 +14,7 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.function.BiFunction;
import org.eclipse.jetty.client.HttpChannel;
@ -42,9 +39,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,7 +63,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
@Override
protected void receive()
{
contentNotifier.process(true);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
contentNotifier.receive(getHttpChannel().getStream(), exchange);
}
@Override
@ -117,20 +116,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
upgrade(upgrader, httpResponse, endPoint);
}
contentNotifier.notifySuccess = frame.isEndStream();
if (responseHeaders(exchange))
{
int status = response.getStatus();
if (frame.isEndStream() || HttpStatus.isInterim(status))
responseSuccess(exchange);
}
else
{
if (frame.isEndStream())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
else
stream.demand(1);
}
}
}
@ -138,10 +131,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
if (((IStream)stream).dataSize() == 0)
responseSuccess(exchange);
else
contentNotifier.notifySuccess = true;
}
}
@ -194,13 +188,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
{
callback.failed(new IOException("terminated"));
}
else
{
notifyContent(exchange, frame, callback);
}
}
void onReset(Stream stream, ResetFrame frame)
@ -230,172 +220,62 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Received content {}", frame);
contentNotifier.process(getHttpChannel().getStream(), exchange, frame, callback);
}
private class ContentNotifier
private static class ContentNotifier
{
private final AutoLock lock = new AutoLock();
private final Queue<DataInfo> queue = new ArrayDeque<>();
private final HttpReceiverOverHTTP2 receiver;
private DataInfo dataInfo;
private boolean active;
private boolean resume;
private boolean stalled;
private volatile boolean notifySuccess;
private ContentNotifier(HttpReceiverOverHTTP2 receiver)
{
this.receiver = receiver;
}
private void offer(HttpExchange exchange, DataFrame frame, Callback callback)
public void receive(Stream stream, HttpExchange exchange)
{
DataInfo dataInfo = new DataInfo(exchange, frame, callback);
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", dataInfo);
enqueue(dataInfo);
process(false);
if (notifySuccess && ((IStream)stream).dataSize() == 0)
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
private void enqueue(DataInfo dataInfo)
private void process(Stream stream, HttpExchange exchange, DataFrame dataFrame, Callback callback)
{
try (AutoLock l = lock.lock())
if (dataFrame.getData().hasRemaining())
{
queue.offer(dataInfo);
}
}
private void process(boolean resume)
{
// Allow only one thread at a time.
boolean busy = active(resume);
if (LOG.isDebugEnabled())
LOG.debug("Resuming({}) processing({}) of content", resume, !busy);
if (busy)
return;
// Process only if there is demand.
try (AutoLock l = lock.lock())
{
if (!resume && demand() <= 0)
if (dataFrame.isEndStream())
notifySuccess = true;
boolean proceed = receiver.responseContent(exchange, dataFrame.getData(), Callback.from(callback::succeeded, x -> fail(callback, x)));
if (proceed)
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, content available but no demand");
active = false;
stalled = true;
return;
}
}
while (true)
{
if (dataInfo != null)
{
if (dataInfo.frame.isEndStream())
{
receiver.responseSuccess(dataInfo.exchange);
// Return even if active, as reset() will be called later.
return;
}
}
try (AutoLock l = lock.lock())
{
dataInfo = queue.poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing content {}", dataInfo);
if (dataInfo == null)
{
active = false;
return;
}
}
ByteBuffer buffer = dataInfo.frame.getData();
Callback callback = dataInfo.callback;
if (buffer.hasRemaining())
{
boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
if (!proceed)
{
// The call to responseContent() said we should
// stall, but another thread may have just resumed.
boolean stall = stall();
if (LOG.isDebugEnabled())
LOG.debug("Stalling({}) processing", stall);
if (stall)
return;
}
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
else
{
callback.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, no demand after {} on {}", dataFrame, this);
}
}
}
private boolean active(boolean resume)
{
try (AutoLock l = lock.lock())
else
{
if (active)
{
// There is a thread in process(),
// but it may be about to exit, so
// remember "resume" to signal the
// processing thread to continue.
if (resume)
this.resume = true;
return true;
}
// If there is no demand (i.e. stalled
// and not resuming) then don't process.
if (stalled && !resume)
return true;
// Start processing.
active = true;
stalled = false;
return false;
}
}
/**
* Called when there is no demand, this method checks whether
* the processing should really stop or it should continue.
*
* @return true to stop processing, false to continue processing
*/
private boolean stall()
{
try (AutoLock l = lock.lock())
{
if (resume)
{
// There was no demand, but another thread
// just demanded, continue processing.
resume = false;
return false;
}
// There is no demand, stop processing.
active = false;
stalled = true;
return true;
callback.succeeded();
if (dataFrame.isEndStream())
receiver.responseSuccess(exchange);
else
stream.demand(1);
}
}
private void reset()
{
dataInfo = null;
try (AutoLock l = lock.lock())
{
queue.clear();
active = false;
resume = false;
stalled = false;
}
notifySuccess = false;
}
private void fail(Callback callback, Throwable failure)
@ -403,25 +283,5 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
callback.failed(failure);
receiver.responseFailure(failure);
}
private class DataInfo
{
private final HttpExchange exchange;
private final DataFrame frame;
private final Callback callback;
private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
{
this.exchange = exchange;
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame);
}
}
}
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -40,6 +41,8 @@ import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@ -610,6 +613,43 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListener() throws Exception
{
var bytes = 100_000;
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
int streamId = stream.getId();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
Callback.Completable callback = new Callback.Completable();
stream.headers(responseFrame, callback);
callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(new byte[bytes]), true), Callback.NOOP));
return null;
}
});
var requestCount = 10_000;
IntStream.range(0, requestCount).forEach(i ->
{
try
{
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort()).headers(httpFields -> httpFields.put("X-Request-Id", Integer.toString(i))).send(listener);
Response response = listener.get(15, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(bytes, listener.getInputStream().readAllBytes().length);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
@Disabled
@Test
@Tag("external")

View File

@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class);
private boolean notifySuccess;
private volatile boolean notifySuccess;
protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
{
@ -86,6 +86,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
// TODO: add support for HttpMethod.CONNECT.
notifySuccess = frame.isLast();
if (responseHeaders(exchange))
{
int status = response.getStatus();
@ -98,7 +99,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after headers on {}", this);
notifySuccess = frame.isLast();
}
}
}
@ -118,12 +118,16 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
if (data.isLast())
notifySuccess = true;
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
@ -136,7 +140,6 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
notifySuccess = data.isLast();
}
}
else