#10543 record failure as chunk to avoid consumeAvailable to stack overflow

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2023-09-21 12:01:24 +02:00
parent 1707724289
commit 1286f91b1c
2 changed files with 86 additions and 0 deletions

View File

@ -528,6 +528,12 @@ public class HttpStreamOverHTTP3 implements HttpStream
public Runnable onFailure(Throwable failure)
{
try (AutoLock ignored = lock.lock())
{
if (chunk != null)
chunk.release();
chunk = Content.Chunk.from(failure, true);
}
return httpChannel.onFailure(failure);
}
}

View File

@ -32,6 +32,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -39,7 +40,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.ByteBufferRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.CompletableResponseListener;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.InputStreamRequestContent;
import org.eclipse.jetty.client.InputStreamResponseListener;
@ -73,6 +76,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -1194,6 +1198,82 @@ public class HttpClientStreamTest extends AbstractTest
assertTrue(clientLatch.await(timeoutInSeconds, TimeUnit.SECONDS));
}
@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:server")
public void testUploadWithRetainedData(Transport transport) throws Exception
{
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
new Runnable()
{
@Override
public void run()
{
while (true)
{
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
// ignore
}
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
callback.failed(chunk.getFailure());
return;
}
chunk.release();
if (chunk.isLast())
{
callback.succeeded();
return;
}
}
}
}.run();
return true;
}
});
byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> errorHolder = new AtomicReference<>();
long timeoutMs = switch (transport)
{
case H2, H2C -> 100;
default -> 1000;
};
new CompletableResponseListener(client.newRequest(newURI(transport)).body(content).timeout(timeoutMs, TimeUnit.MILLISECONDS))
.send()
.whenComplete((r, t) ->
{
errorHolder.set(t);
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertInstanceOf(TimeoutException.class, errorHolder.get());
}
private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
}