#10543 handle review comments

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2023-09-22 16:01:12 +02:00
parent b25dc8183b
commit 5c9d796a49
2 changed files with 48 additions and 42 deletions

View File

@ -162,7 +162,7 @@ public class AbstractTest
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue); isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue);
if (disabled) if (disabled)
{ {
System.err.println("Not tracking leaks"); System.err.println("Not tracking " + tagSubValue + " leaks");
return true; return true;
} }
@ -172,7 +172,7 @@ public class AbstractTest
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + transportName); isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + transportName);
if (disabled) if (disabled)
{ {
System.err.println("Not tracking leaks for transport " + transportName); System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName);
return true; return true;
} }
} }
@ -181,7 +181,7 @@ public class AbstractTest
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue); isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue);
if (disabled) if (disabled)
{ {
System.err.println("Not tracking leaks for " + tagSubValue); System.err.println("Not tracking " + tagSubValue + " leaks");
return true; return true;
} }
@ -191,7 +191,7 @@ public class AbstractTest
isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue + ":" + transportName); isAnnotatedWithTagValue(testInfo.getTestClass().orElseThrow(), disableLeakTrackingTagValue + ":" + tagSubValue + ":" + transportName);
if (disabled) if (disabled)
{ {
System.err.println("Not tracking leaks for " + tagSubValue + " using transport " + transportName); System.err.println("Not tracking " + tagSubValue + " leaks for transport " + transportName);
return true; return true;
} }
} }

View File

@ -32,7 +32,6 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -59,6 +58,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CompletableTask;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
@ -1200,81 +1200,87 @@ public class HttpClientStreamTest extends AbstractTest
@ParameterizedTest @ParameterizedTest
@MethodSource("transports") @MethodSource("transports")
@Tag("DisableLeakTracking:server") @Tag("DisableLeakTracking")
public void testHttpStreamConsumeAvailableUponClientTimeout(Transport transport) throws Exception public void testHttpStreamConsumeAvailableUponClientTimeout(Transport transport) throws Exception
{ {
AtomicReference<org.eclipse.jetty.client.Request> clientRequestRef = new AtomicReference<>();
start(transport, new Handler.Abstract() start(transport, new Handler.Abstract()
{ {
@Override @Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{ {
// Consume the uploaded data very slowly to make the client timeout. new CompletableTask<>()
new Runnable()
{ {
@Override @Override
public void run() public void run()
{ {
while (true) while (true)
{ {
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
completeExceptionally(chunk.getFailure());
return;
}
chunk.release();
if (chunk.isLast())
{
complete(null);
return;
}
org.eclipse.jetty.client.Request r = clientRequestRef.getAndSet(null);
if (r != null)
{
// Abort the client request then give some time for the client's
// abort notification (e.g.: reset frame) to reach the server.
r.abort(new IllegalCallerException());
try try
{ {
Thread.sleep(100); Thread.sleep(100);
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {
// ignore completeExceptionally(e);
}
Content.Chunk chunk = request.read();
if (chunk == null)
{
request.demand(this);
return; return;
} }
if (Content.Chunk.isFailure(chunk))
{
callback.failed(chunk.getFailure());
return;
} }
}
chunk.release(); }
}
if (chunk.isLast()) .start()
.whenComplete((result, failure) ->
{ {
if (failure == null)
callback.succeeded(); callback.succeeded();
return; else
} callback.failed(failure);
} });
}
}.run();
return true; return true;
} }
}); });
// Upload a large amount of data to the server with a timeout small enough
// that the client will timeout during the transfer.
byte[] data = new byte[16 * 1024 * 1024]; byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data); new Random().nextBytes(data);
ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data)); ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data));
CountDownLatch latch = new CountDownLatch(1); org.eclipse.jetty.client.Request request = client.newRequest(newURI(transport))
AtomicReference<Throwable> errorHolder = new AtomicReference<>(); .body(content);
long timeoutMs = switch (transport) clientRequestRef.set(request);
{ Throwable throwable = new CompletableResponseListener(request)
case H2, H2C -> 100;
default -> 1000;
};
new CompletableResponseListener(client.newRequest(newURI(transport)).body(content).timeout(timeoutMs, TimeUnit.MILLISECONDS))
.send() .send()
.whenComplete((r, t) -> .handle((r, t) -> t)
{ .get(5, TimeUnit.SECONDS);
errorHolder.set(t);
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS)); assertInstanceOf(IllegalCallerException.class, throwable);
assertInstanceOf(TimeoutException.class, errorHolder.get());
} }
private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback) private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)