Issue #10513 - fix multipart lockup with HTTP/2 (#10554)

* Changes the H2 semantic of `Stream.readData()` so that it is `readData()` that enlarges the flow control window, and not anymore the release of the `Stream.Data`.
This allows applications to buffer in memory by retaining the `Stream.Data` instances more than the H2 flow control window.
* Updated `FlowControlStrategyTest` after `Stream.readData()` semantic changes.
* Updated `DataDemandTest` after `Stream.readData()` semantic changes.
* Updated documentation.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: gregw <gregw@webtide.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Lachlan 2023-09-28 19:47:32 +10:00 committed by GitHub
parent 5be34089ef
commit aa880cff09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 421 additions and 454 deletions

View File

@ -61,11 +61,11 @@ Applications must call `Stream.demand()` to explicitly require that `onDataAvail
Applications that consume the content buffer within `onDataAvailable(Stream stream)` (for example, writing it to a file, or copying the bytes to another storage) should call `Data.release()` as soon as they have consumed the content buffer.
This allows the implementation to reuse the buffer, reducing the memory requirements needed to handle the content buffers.
Alternatively, a client application may store away the `Data` object to consume the buffer bytes later, or pass the `Data` object to another asynchronous API (this is typical in proxy applications).
Alternatively, an application may store away the `Data` object to consume the buffer bytes later, or pass the `Data` object to another asynchronous API (this is typical in proxy applications).
[IMPORTANT]
====
Calling `Data.release()` is very important not only to allow the implementation to reuse the buffer, but also tells the implementation to enlarge the stream and session flow control windows so that the sender will be able to send more `DATA` frames without stalling.
The call to `Stream.readData()` tells the implementation to enlarge the stream and session flow control windows so that the sender will be able to send more `DATA` frames without stalling.
====
Applications can unwrap the `Data` object into some other object that may be used later, provided that the _release_ semantic is maintained:

View File

@ -259,7 +259,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
// SPEC: the session window must be updated even if the stream is null.
// The flow control length includes the padding bytes.
int flowControlLength = frame.remaining() + frame.padding();
int flowControlLength = frame.flowControlLength();
flowControl.onDataReceived(this, stream, flowControlLength);
if (stream != null)
@ -278,11 +278,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
}
else
{
// StreamData has its own reference count (that starts at 1),
// so since we create it here, we release it after stream.process().
StreamData streamData = new StreamData(data, stream, flowControlLength);
stream.process(streamData);
streamData.release();
stream.process(data);
}
}
}
@ -292,7 +288,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
LOG.debug("Stream #{} not found on {}", streamId, this);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
dataConsumed(null, flowControlLength);
if (isStreamClosed(streamId))
reset(null, new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
else
@ -300,6 +296,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
}
}
void dataConsumed(HTTP2Stream stream, int length)
{
notIdle();
flowControl.onDataConsumed(this, stream, length);
}
private boolean isStreamClosed(int streamId)
{
return isLocalStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
@ -2361,7 +2363,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
}
}
private class Slot
private static class Slot
{
private volatile List<Entry> entries;
}
@ -2389,69 +2391,4 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
return false;
}
}
/**
* @implNote This class needs an extra reference counter because it needs to
* open the flow control window when the application releases this instance.
* Imagine a network buffer with 2 DATA frames: this will create 2 Data
* objects, which will be passed to the application. The network buffer is
* now retained 3 times (1 time for the network usage, and 1 time for each
* Data object).
* When the application releases the first Data object, the flow control
* window should be opened immediately for the length of that Data object,
* so the implementation cannot rely on delegating the call to release()
* to the network buffer, because the network buffer will still be retained.
* Furthermore, the flow control logic must be executed only once, while
* the Data object release() method may be invoked multiple times (since
* it may be additionally retained, for example when converted to a Chunk).
* The solution is to have an additional reference counter for the objects
* of this class, that allows to invoke the flow control logic only once,
* and only when all retains performed on an instance have been released.
*/
private class StreamData extends Stream.Data
{
private final ReferenceCounter counter = new ReferenceCounter();
private final Stream.Data data;
private final HTTP2Stream stream;
private final int flowControlLength;
private StreamData(Stream.Data data, HTTP2Stream stream, int flowControlLength)
{
super(data.frame());
this.data = data;
this.stream = stream;
this.flowControlLength = flowControlLength;
// Since this class starts its own reference counter
// at 1, we need to retain the delegate Data object,
// so that the releases will be paired.
data.retain();
}
@Override
public boolean canRetain()
{
return data.canRetain();
}
@Override
public void retain()
{
counter.retain();
data.retain();
}
@Override
public boolean release()
{
data.release();
boolean result = counter.release();
if (result)
{
notIdle();
stream.notIdle();
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
return result;
}
}
}

View File

@ -490,6 +490,13 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
if (LOG.isDebugEnabled())
LOG.debug("Reading {} for {}", data, this);
notIdle();
// Enlarge the flow control window now, since the application
// may want to retain the Data objects, accumulating them in
// memory beyond the flow control window, without copying them.
session.dataConsumed(this, data.frame().flowControlLength());
return data;
}

View File

@ -113,9 +113,10 @@ public interface Stream
* stream</li>
* </ul>
* <p>When the returned {@link Stream.Data} object is not {@code null},
* applications <em>must</em> call, either immediately or later (possibly
* asynchronously) {@link Stream.Data#release()} to notify the
* implementation that the bytes have been processed.</p>
* the flow control window has been enlarged by the DATA frame length;
* applications <em>must</em> call, either immediately or later (even
* asynchronously from a different thread) {@link Stream.Data#release()}
* to notify the implementation that the bytes have been processed.</p>
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been
* received).</p>

View File

@ -19,6 +19,7 @@ public class DataFrame extends StreamFrame
{
private final ByteBuffer data;
private final boolean endStream;
private final int length;
private final int padding;
public DataFrame(ByteBuffer data, boolean endStream)
@ -36,6 +37,7 @@ public class DataFrame extends StreamFrame
super(FrameType.DATA, streamId);
this.data = data;
this.endStream = endStream;
this.length = data.remaining();
this.padding = padding;
}
@ -65,6 +67,14 @@ public class DataFrame extends StreamFrame
return padding;
}
/**
* @return the flow control length, equivalent to the sum of data bytes and padding bytes
*/
public int flowControlLength()
{
return length + padding;
}
@Override
public DataFrame withStreamId(int streamId)
{
@ -74,6 +84,6 @@ public class DataFrame extends StreamFrame
@Override
public String toString()
{
return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), remaining(), isEndStream());
return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), length, isEndStream());
}
}

View File

@ -1,26 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.tests;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.FlowControlStrategy;
public class BufferingFlowControlStrategyTest extends FlowControlStrategyTest
{
@Override
protected FlowControlStrategy newFlowControlStrategy()
{
return new BufferingFlowControlStrategy(0.5F);
}
}

View File

@ -14,10 +14,6 @@
package org.eclipse.jetty.http2.tests;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -43,7 +39,6 @@ import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -57,13 +52,11 @@ public class DataDemandTest extends AbstractTest
{
int length = FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1;
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
Deque<Stream.Data> serverQueue = new ConcurrentLinkedDeque<>();
start(new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverStreamRef.set(stream);
MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, false), Callback.from(stream::demand));
return new Stream.Listener()
@ -71,9 +64,8 @@ public class DataDemandTest extends AbstractTest
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
// Don't demand and don't release.
serverQueue.offer(data);
// Don't read and don't demand.
serverStreamRef.set(stream);
}
};
}
@ -81,88 +73,78 @@ public class DataDemandTest extends AbstractTest
Session client = newClientSession(new Session.Listener() {});
MetaData.Request post = newRequest("POST", HttpFields.EMPTY);
FuturePromise<Stream> promise = new FuturePromise<>();
Queue<Stream.Data> clientQueue = new ConcurrentLinkedQueue<>();
client.newStream(new HeadersFrame(post, null, false), promise, new Stream.Listener()
AtomicReference<Stream> clientStreamRef = new AtomicReference<>();
client.newStream(new HeadersFrame(post, null, false), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
// Don't demand and don't release.
clientQueue.offer(data);
// Don't read and don't demand.
clientStreamRef.set(stream);
}
});
Stream clientStream = promise.get(5, TimeUnit.SECONDS);
// Send a single frame larger than the default frame size,
// so that it will be split on the server in multiple frames.
clientStream.data(new DataFrame(clientStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP);
// The server should receive only 1 DATA frame because it does 1 explicit demand.
// Wait a bit more to be sure it only receives 1 DATA frame.
Thread.sleep(1000);
assertEquals(1, serverQueue.size());
Stream serverStream = serverStreamRef.get();
assertNotNull(serverStream);
// Demand 1 more DATA frames.
serverStream.demand();
// The server should have received 1 more DATA frame.
await().atMost(1, TimeUnit.SECONDS).until(serverQueue::size, is(2));
// Demand all the rest.
AtomicInteger count = new AtomicInteger(serverQueue.size());
while (true)
}).thenCompose(s ->
{
serverStream.demand();
await().atMost(1, TimeUnit.SECONDS).until(() -> serverQueue.size() == count.get() + 1);
count.incrementAndGet();
long sum = serverQueue.stream()
.mapToLong(data -> data.frame().getByteBuffer().remaining())
.sum();
if (sum == length)
break;
}
// Send a single frame larger than the default frame size,
// so that it will be split on the server in multiple frames.
return s.data(new DataFrame(s.getId(), ByteBuffer.allocate(length), true));
});
// Even if demanded, the flow control window should not have
// decreased because the callbacks have not been completed.
int recvWindow = ((HTTP2Session)serverStream.getSession()).updateRecvWindow(0);
assertEquals(FlowControlStrategy.DEFAULT_WINDOW_SIZE - length, recvWindow);
// The server onDataAvailable() should be invoked once because it does one explicit demand.
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null);
Stream serverStream = serverStreamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() == null);
// Release them all.
serverQueue.forEach(Stream.Data::release);
// Read and demand 1 more DATA frame.
Stream.Data data = serverStream.readData();
assertNotNull(data);
AtomicInteger serverReceived = new AtomicInteger(data.frame().remaining());
data.release();
serverStream.demand();
// The server onDataAvailable() should be invoked.
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null);
// Read all the rest.
await().pollInterval(1, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() ->
{
Stream.Data d = serverStream.readData();
if (d == null)
return false;
serverReceived.addAndGet(d.frame().remaining());
d.release();
return d.frame().isEndStream();
});
assertEquals(length, serverReceived.get());
// Send a large DATA frame to the client.
serverStream.data(new DataFrame(serverStream.getId(), ByteBuffer.allocate(length), true), Callback.NOOP);
serverStream.data(new DataFrame(serverStream.getId(), ByteBuffer.allocate(length), true));
// The client should receive only 1 DATA frame because it does explicit demand.
// Wait a bit more to be sure it only receives 1 DATA frame.
Thread.sleep(1000);
assertEquals(1, clientQueue.size());
// The client onDataAvailable() should be invoked once because it does one explicit demand.
await().atMost(5, TimeUnit.SECONDS).until(() -> clientStreamRef.get() != null);
Stream clientStream = clientStreamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> clientStreamRef.get() == null);
// Demand 1 more DATA frames.
// Read and demand 1 more DATA frame.
data = clientStream.readData();
assertNotNull(data);
AtomicInteger clientReceived = new AtomicInteger(data.frame().remaining());
data.release();
clientStream.demand();
Thread.sleep(1000);
// The client should have received 1 more DATA frame.
assertEquals(2, clientQueue.size());
// Demand all the rest.
count.set(clientQueue.size());
while (true)
// The client onDataAvailable() should be invoked.
await().atMost(5, TimeUnit.SECONDS).until(() -> clientStreamRef.get() != null);
// Read all the rest.
await().pollInterval(1, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() ->
{
clientStream.demand();
await().atMost(1, TimeUnit.SECONDS).until(() -> clientQueue.size() == count.get() + 1);
count.incrementAndGet();
long sum = clientQueue.stream()
.mapToLong(data -> data.frame().getByteBuffer().remaining())
.sum();
if (sum == length)
break;
}
// Release them all.
clientQueue.forEach(Stream.Data::release);
Stream.Data d = clientStream.readData();
if (d == null)
return false;
clientReceived.addAndGet(d.frame().remaining());
d.release();
return d.frame().isEndStream();
});
assertEquals(length, clientReceived.get());
// Both the client and server streams should be gone now.
assertNull(clientStream.getSession().getStream(clientStream.getId()));

View File

@ -22,9 +22,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -41,6 +39,7 @@ import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -62,25 +61,22 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public abstract class FlowControlStrategyTest
public class FlowControlStrategyTest
{
protected ServerConnector connector;
protected HTTP2Client client;
protected Server server;
private ServerConnector connector;
private HTTP2Client client;
private Server server;
protected abstract FlowControlStrategy newFlowControlStrategy();
protected void start(ServerSessionListener listener) throws Exception
protected void start(FlowControlStrategyType type, ServerSessionListener listener) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
@ -88,7 +84,7 @@ public abstract class FlowControlStrategyTest
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setFlowControlStrategyFactory(FlowControlStrategyTest.this::newFlowControlStrategy);
connectionFactory.setFlowControlStrategyFactory(() -> newFlowControlStrategy(type));
connector = new ServerConnector(server, connectionFactory);
server.addConnector(connector);
server.start();
@ -99,7 +95,7 @@ public abstract class FlowControlStrategyTest
client.setExecutor(clientExecutor);
client.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
client.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
client.setFlowControlStrategyFactory(FlowControlStrategyTest.this::newFlowControlStrategy);
client.setFlowControlStrategyFactory(() -> newFlowControlStrategy(type));
client.start();
}
@ -121,6 +117,15 @@ public abstract class FlowControlStrategyTest
return new MetaData.Request(method, HttpScheme.HTTP.asString(), new HostPortHttpField(authority), "/", HttpVersion.HTTP_2, fields, -1);
}
protected FlowControlStrategy newFlowControlStrategy(FlowControlStrategyType type)
{
return switch (type)
{
case SIMPLE -> new SimpleFlowControlStrategy();
case BUFFERING -> new BufferingFlowControlStrategy(0.5F);
};
}
@AfterEach
public void dispose() throws Exception
{
@ -130,14 +135,15 @@ public abstract class FlowControlStrategyTest
server.stop();
}
@Test
public void testWindowSizeUpdates() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testWindowSizeUpdates(FlowControlStrategyType type) throws Exception
{
CountDownLatch prefaceLatch = new CountDownLatch(1);
CountDownLatch stream1Latch = new CountDownLatch(1);
CountDownLatch stream2Latch = new CountDownLatch(1);
CountDownLatch settingsLatch = new CountDownLatch(1);
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
@ -220,59 +226,47 @@ public abstract class FlowControlStrategyTest
assertTrue(stream2Latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testFlowControlWithConcurrentSettings() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testFlowControlWithConcurrentSettings(FlowControlStrategyType type) throws Exception
{
// Initial window is 64 KiB. We allow the client to send 1024 B
// then we change the window to 512 B. At this point, the client
// must stop sending data (although the initial window allows it).
int size = 512;
// We get 3 data frames: the first of 1024 and 2 of 512 each
// after the flow control window has been reduced.
CountDownLatch dataLatch = new CountDownLatch(3);
AtomicReference<Stream.Data> dataRef = new AtomicReference<>();
start(new ServerSessionListener()
AtomicInteger dataAvailable = new AtomicInteger();
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
serverStreamRef.set(stream);
MetaData.Response response = new MetaData.Response(200, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
stream.demand();
return new Stream.Listener()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
dataLatch.countDown();
int dataFrameCount = dataFrames.incrementAndGet();
if (dataFrameCount == 1)
if (dataAvailable.incrementAndGet() == 1)
{
dataRef.set(data);
// Do not read so the flow control window is not enlarged.
// Send the update on the flow control window.
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, size);
stream.getSession().settings(new SettingsFrame(settings, false), Callback.NOOP);
stream.demand();
// Do not release the data here.
}
else if (dataFrameCount > 1)
{
// Release the data.
data.release();
if (!data.frame().isEndStream())
stream.demand();
// Since we did not read, don't demand, otherwise we will be called again.
}
}
};
}
});
// Two SETTINGS frames, the initial one and the one we send from the server.
// Two SETTINGS frames, the initial one and the one for the flow control window update.
CountDownLatch settingsLatch = new CountDownLatch(2);
Session session = newClient(new Session.Listener()
{
@ -284,35 +278,35 @@ public abstract class FlowControlStrategyTest
});
MetaData.Request request = newRequest("POST", HttpFields.EMPTY);
FuturePromise<Stream> promise = new FuturePromise<>();
session.newStream(new HeadersFrame(request, null, false), promise, null);
Stream stream = promise.get(5, TimeUnit.SECONDS);
Stream stream = session.newStream(new HeadersFrame(request, null, false), null)
.get(5, TimeUnit.SECONDS);
// Send first chunk that exceeds the window.
// Send first chunk that will exceed the flow control window when the new SETTINGS is received.
CompletableFuture<Stream> completable = stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false));
settingsLatch.await(5, TimeUnit.SECONDS);
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
completable.thenAccept(s ->
{
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
// Send the second chunk of data, must not leave the client since it is flow control stalled.
s.data(new DataFrame(s.getId(), ByteBuffer.allocate(size * 2), true));
});
assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Verify that the server only received one data available notification.
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> dataAvailable.get() == 1);
// Release the data arrived to server, this will resume flow control on the client.
dataRef.get().release();
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
// Now read from the server, so the flow control window
// is enlarged and the client can resume sending.
consumeAll(serverStreamRef.get());
}
@Test
public void testServerFlowControlOneBigWrite() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testServerFlowControlOneBigWrite(FlowControlStrategyType type) throws Exception
{
int windowSize = 1536;
int length = 5 * windowSize;
CountDownLatch settingsLatch = new CountDownLatch(2);
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
@ -350,72 +344,56 @@ public abstract class FlowControlStrategyTest
return flow.getInitialStreamRecvWindow() == windowSize;
});
CountDownLatch dataLatch = new CountDownLatch(1);
Exchanger<Stream.Data> exchanger = new Exchanger<>();
AtomicReference<Stream> streamRef = new AtomicReference<>();
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener()
session.newStream(requestFrame, new Stream.Listener()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1 || dataFrames == 2)
{
// Do not release the Data.
// We should then be flow-control stalled.
exchanger.exchange(data);
stream.demand();
}
else if (dataFrames == 3 || dataFrames == 4 || dataFrames == 5)
{
// Consume totally.
data.release();
if (data.frame().isEndStream())
dataLatch.countDown();
else
stream.demand();
}
else
{
fail("Unrecognized dataFrames: " + dataFrames);
}
}
catch (InterruptedException x)
{
data.release();
}
// Do not read to stall the server.
streamRef.set(stream);
}
});
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef.get() != null);
Stream.Data data = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
// Did not read yet, verify that we are flow control stalled.
Stream stream = streamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> streamRef.get() == null);
// Release the first chunk.
// Read the first chunk.
Stream.Data data = stream.readData();
assertNotNull(data);
data.release();
data = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
// Did not demand, so onDataAvailable() should not be called.
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> streamRef.get() == null);
// Release the second chunk.
// Demand, onDataAvailable() should be called.
stream.demand();
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef.get() != null);
// Did not read yet, verify that we are flow control stalled.
stream = streamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> streamRef.get() == null);
// Read the second chunk.
data = stream.readData();
assertNotNull(data);
data.release();
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
consumeAll(stream);
}
@Test
public void testClientFlowControlOneBigWrite() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testClientFlowControlOneBigWrite(FlowControlStrategyType type) throws Exception
{
int windowSize = 1536;
Exchanger<Stream.Data> exchanger = new Exchanger<>();
CountDownLatch dataLatch = new CountDownLatch(1);
AtomicReference<HTTP2Session> serverSessionRef = new AtomicReference<>();
start(new ServerSessionListener()
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
start(type, new ServerSessionListener()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
@ -435,40 +413,11 @@ public abstract class FlowControlStrategyTest
stream.demand();
return new Stream.Listener()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
int dataFrames = this.dataFrames.incrementAndGet();
if (dataFrames == 1 || dataFrames == 2)
{
// Do not consume the data frame.
// We should then be flow-control stalled.
exchanger.exchange(data);
stream.demand();
}
else if (dataFrames == 3 || dataFrames == 4 || dataFrames == 5)
{
// Consume totally.
data.release();
if (data.frame().isEndStream())
dataLatch.countDown();
else
stream.demand();
}
else
{
fail("Unrecognized dataFrames: " + dataFrames);
}
}
catch (InterruptedException x)
{
data.release();
}
// Do not read to stall the server.
serverStreamRef.set(stream);
}
};
}
@ -484,40 +433,51 @@ public abstract class FlowControlStrategyTest
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
clientSession.newStream(requestFrame, streamPromise, null);
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
clientSession.newStream(requestFrame, null)
.thenCompose(s ->
{
int length = 5 * windowSize;
DataFrame dataFrame = new DataFrame(s.getId(), ByteBuffer.allocate(length), true);
return s.data(dataFrame);
});
int length = 5 * windowSize;
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.NOOP);
// Verify that the data arrived to the server.
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null);
Stream.Data data = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
// Did not read yet, verify that we are flow control stalled.
Stream serverStream = serverStreamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() == null);
// Consume the first chunk.
// Read the first chunk.
Stream.Data data = serverStream.readData();
assertNotNull(data);
data.release();
data = exchanger.exchange(null, 5, TimeUnit.SECONDS);
checkThatWeAreFlowControlStalled(exchanger);
// Did not demand, so onDataAvailable() should not be called.
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() == null);
// Consume the second chunk.
// Demand, onDataAvailable() should be called.
serverStream.demand();
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null);
// Did not read yet, verify that we are flow control stalled.
serverStream = serverStreamRef.getAndSet(null);
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() == null);
// Read the second chunk.
data = serverStream.readData();
assertNotNull(data);
data.release();
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
consumeAll(serverStream);
}
private void checkThatWeAreFlowControlStalled(Exchanger<Stream.Data> exchanger)
{
assertThrows(TimeoutException.class,
() -> exchanger.exchange(null, 1, TimeUnit.SECONDS));
}
@Test
public void testSessionStalledStallsNewStreams() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testSessionStalledStallsNewStreams(FlowControlStrategyType type) throws Exception
{
int windowSize = 1024;
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -554,95 +514,92 @@ public abstract class FlowControlStrategyTest
Session session = newClient(new Session.Listener() {});
// First request is just to consume most of the session window.
List<Stream.Data> dataList1 = new ArrayList<>();
CountDownLatch prepareLatch = new CountDownLatch(1);
AtomicReference<Stream> streamRef1 = new AtomicReference<>();
MetaData.Request request1 = newRequest("POST", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
// Do not consume the data to reduce the session window.
dataList1.add(data);
if (data.frame().isEndStream())
prepareLatch.countDown();
else
stream.demand();
// Do not read to stall flow control.
streamRef1.set(stream);
}
});
assertTrue(prepareLatch.await(5, TimeUnit.SECONDS));
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef1.get() != null);
// Second request will consume half of the remaining the session window.
List<Stream.Data> dataList2 = new ArrayList<>();
AtomicReference<Stream> streamRef2 = new AtomicReference<>();
MetaData.Request request2 = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request2, null, true), new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (!data.frame().isEndStream())
stream.demand();
// Do not release it to stall flow control.
dataList2.add(data);
// Do not read to stall flow control.
streamRef2.set(stream);
}
});
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef2.get() != null);
// Third request will consume the whole session window, which is now stalled.
// A fourth request will not be able to receive data.
List<Stream.Data> dataList3 = new ArrayList<>();
// A fourth request will not be able to receive data because the server is stalled.
AtomicReference<Stream> streamRef3 = new AtomicReference<>();
MetaData.Request request3 = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request3, null, true), new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (!data.frame().isEndStream())
stream.demand();
// Do not release it to stall flow control.
dataList3.add(data);
// Do not read to stall flow control.
streamRef3.set(stream);
}
});
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef3.get() != null);
// Fourth request is now stalled.
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Stream> streamRef4 = new AtomicReference<>();
MetaData.Request request4 = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<>(), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
latch.countDown();
else
stream.demand();
streamRef4.set(stream);
}
});
// Verify that the data does not arrive because the server session is stalled.
assertFalse(latch.await(1, TimeUnit.SECONDS));
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(() -> streamRef4.get() == null);
// Consume the data of the first response.
// This will open up the session window, allowing the fourth stream to send data.
dataList1.forEach(Stream.Data::release);
consumeAll(streamRef1.get());
await().atMost(5, TimeUnit.SECONDS).until(() -> streamRef4.get() != null);
assertTrue(latch.await(5, TimeUnit.SECONDS));
dataList2.forEach(Stream.Data::release);
dataList3.forEach(Stream.Data::release);
consumeAll(streamRef2.get());
consumeAll(streamRef3.get());
consumeAll(streamRef4.get());
}
@Test
public void testServerSendsBigContent() throws Exception
private void consumeAll(Stream stream) throws Exception
{
await().pollInterval(1, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() ->
{
Stream.Data data = stream.readData();
if (data == null)
return false;
data.release();
return data.frame().isEndStream();
});
}
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testServerSendsBigContent(FlowControlStrategyType type) throws Exception
{
byte[] data = new byte[1024 * 1024];
new Random().nextBytes(data);
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
@ -684,10 +641,11 @@ public abstract class FlowControlStrategyTest
assertArrayEquals(data, bytes);
}
@Test
public void testClientSendingInitialSmallWindow() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testClientSendingInitialSmallWindow(FlowControlStrategyType type) throws Exception
{
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -735,24 +693,24 @@ public abstract class FlowControlStrategyTest
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
responseContent.put(data.frame().getByteBuffer());
data.release();
if (data.frame().isEndStream())
latch.countDown();
else
stream.demand();
}
})
.thenAccept(s ->
{
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
s.data(new DataFrame(s.getId(), requestContent, true));
});
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
responseContent.put(data.frame().getByteBuffer());
data.release();
if (data.frame().isEndStream())
latch.countDown();
else
stream.demand();
}
})
.thenAccept(s ->
{
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
s.data(new DataFrame(s.getId(), requestContent, true));
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -760,13 +718,14 @@ public abstract class FlowControlStrategyTest
assertArrayEquals(requestData, responseData);
}
@Test
public void testClientExceedingSessionWindow() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testClientExceedingSessionWindow(FlowControlStrategyType type) throws Exception
{
// On server, we don't consume the data.
List<Stream.Data> dataList = new ArrayList<>();
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -861,13 +820,14 @@ public abstract class FlowControlStrategyTest
dataList.forEach(Stream.Data::release);
}
@Test
public void testClientExceedingStreamWindow() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testClientExceedingStreamWindow(FlowControlStrategyType type) throws Exception
{
// On server, we don't consume the data.
List<Stream.Data> dataList = new ArrayList<>();
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
@ -966,11 +926,12 @@ public abstract class FlowControlStrategyTest
dataList.forEach(Stream.Data::release);
}
@Test
public void testFlowControlWhenServerResetsStream() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testFlowControlWhenServerResetsStream(FlowControlStrategyType type) throws Exception
{
// On server, don't consume the data and immediately reset.
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -1034,11 +995,11 @@ public abstract class FlowControlStrategyTest
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testNoWindowUpdateForRemotelyClosedStream() throws Exception
@ParameterizedTest
@EnumSource(FlowControlStrategyType.class)
public void testNoWindowUpdateForRemotelyClosedStream(FlowControlStrategyType type) throws Exception
{
List<Stream.Data> dataList = new ArrayList<>();
start(new ServerSessionListener()
start(type, new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@ -1050,18 +1011,11 @@ public abstract class FlowControlStrategyTest
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
dataList.add(data);
if (data.frame().isEndStream())
{
// Release the Data when the stream is already remotely closed.
dataList.forEach(Stream.Data::release);
MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
else
{
stream.demand();
}
data.release();
boolean last = data.frame().isEndStream();
int status = last ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500;
MetaData.Response response = new MetaData.Response(status, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
};
}
@ -1069,7 +1023,7 @@ public abstract class FlowControlStrategyTest
List<WindowUpdateFrame> sessionWindowUpdates = new ArrayList<>();
List<WindowUpdateFrame> streamWindowUpdates = new ArrayList<>();
client.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(0.5F)
client.setFlowControlStrategyFactory(() -> new SimpleFlowControlStrategy()
{
@Override
public void onWindowUpdate(Session session, Stream stream, WindowUpdateFrame frame)
@ -1098,12 +1052,24 @@ public abstract class FlowControlStrategyTest
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1);
// Write a small DATA frame so the server only performs 1 readData().
ByteBuffer data = ByteBuffer.allocate(1);
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(sessionWindowUpdates.size() > 0);
int sessionUpdates = switch (type)
{
case SIMPLE -> 1;
// For small writes, session updates are buffered.
case BUFFERING -> 0;
};
assertEquals(sessionUpdates, sessionWindowUpdates.size());
assertEquals(0, streamWindowUpdates.size());
}
public enum FlowControlStrategyType
{
SIMPLE, BUFFERING
}
}

View File

@ -1,26 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.tests;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
public class SimpleFlowControlStrategyTest extends FlowControlStrategyTest
{
@Override
protected FlowControlStrategy newFlowControlStrategy()
{
return new SimpleFlowControlStrategy();
}
}

View File

@ -28,10 +28,13 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -52,6 +55,7 @@ import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
@ -67,6 +71,8 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@ -84,6 +90,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientStreamTest extends AbstractTest
{
private static final Logger LOG = LoggerFactory.getLogger(HttpClientStreamTest.class);
@ParameterizedTest
@MethodSource("transports")
public void testFileUpload(Transport transport) throws Exception
@ -500,7 +508,7 @@ public class HttpClientStreamTest extends AbstractTest
public int read()
{
// Will eventually throw ArrayIndexOutOfBounds
return data[index++];
return data[index++] & 0xFF;
}
}, data.length / 2))
.timeout(5, TimeUnit.SECONDS)
@ -1156,8 +1164,10 @@ public class HttpClientStreamTest extends AbstractTest
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
processCount.incrementAndGet();
processLatch.await(timeoutInSeconds * 2, TimeUnit.SECONDS);
callback.succeeded();
if (processLatch.await(timeoutInSeconds * 2, TimeUnit.SECONDS))
callback.succeeded();
else
callback.failed(new TimeoutException());
return true;
}
});
@ -1284,6 +1294,112 @@ public class HttpClientStreamTest extends AbstractTest
assertInstanceOf(IllegalCallerException.class, throwable);
}
@ParameterizedTest
@MethodSource("transportsNoFCGI")
@Tag("DisableLeakTracking:server:UNIX_DOMAIN")
@Tag("DisableLeakTracking:server:HTTP")
@Tag("DisableLeakTracking:server:HTTPS")
public void testUploadWithRetainedData(Transport transport) throws Exception
{
// TODO: broken for FCGI, investigate.
List<Content.Chunk> chunks = new CopyOnWriteArrayList<>();
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
callback.completeWith(new CompletableTask<>()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
completeExceptionally(chunk.getFailure());
return;
}
if (chunk.hasRemaining())
{
ByteBuffer byteBuffer = chunk.getByteBuffer();
if (chunk.canRetain())
{
chunk.retain();
chunks.add(Content.Chunk.asChunk(byteBuffer.slice(), chunk.isLast(), chunk));
}
else
{
chunks.add(Content.Chunk.from(BufferUtil.copy(byteBuffer), chunk.isLast()));
}
if (chunks.size() % 100 == 0)
dumpChunks(chunks);
BufferUtil.clear(byteBuffer);
}
chunk.release();
if (chunk.isLast())
{
complete(null);
return;
}
}
}
}.start());
return true;
}
});
byte[] data = new byte[10 * 1024 * 1024];
new Random().nextBytes(data);
CountDownLatch latch = new CountDownLatch(1);
ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data));
new CompletableResponseListener(client.newRequest(newURI(transport)).body(content))
.send()
.whenComplete((r, t) ->
{
if (t != null)
t.printStackTrace();
else if (r.getStatus() == 200)
latch.countDown();
});
assertTrue(latch.await(30, TimeUnit.SECONDS));
try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator())
{
for (Content.Chunk c : chunks)
{
ByteBuffer byteBuffer = c.getByteBuffer();
accumulator.copyBuffer(byteBuffer);
BufferUtil.clear(byteBuffer);
c.release();
}
assertArrayEquals(data, accumulator.toByteArray());
}
}
private void dumpChunks(List<Content.Chunk> chunks)
{
long accumulated = 0L;
for (Content.Chunk chunk : chunks)
{
accumulated += chunk.remaining();
}
LOG.info("Accumulated {} chunks totalling {} bytes", chunks.size(), accumulated);
}
private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
}