Issue #4400 - Review HttpClient's ContentProvider.

Review updates.
* Now AbstractRequestContent supports multiple subscriptions.
* Reviewed abort() path to fail the content and the subscription
  and notify FailureListener sequentially with other listeners.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-03-26 17:46:59 +01:00
parent 9fec1f43e0
commit 0f2ddc8c9f
6 changed files with 98 additions and 71 deletions

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.slf4j.Logger;
@ -237,6 +238,12 @@ public class HttpExchange
// We failed this exchange, deal with it.
// Applications could be blocked providing
// request content, notify them of the failure.
Request.Content body = request.getBody();
if (abortRequest && body != null)
body.fail(failure);
// Case #1: exchange was in the destination queue.
if (destination.remove(this))
{

View File

@ -824,11 +824,7 @@ public class HttpRequest implements Request
public boolean abort(Throwable cause)
{
if (aborted.compareAndSet(null, Objects.requireNonNull(cause)))
{
if (content != null)
content.fail(cause);
return conversation.abort(cause);
}
return false;
}

View File

@ -52,9 +52,9 @@ public abstract class HttpSender
private final ContentConsumer consumer = new ContentConsumer();
private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final HttpChannel channel;
private Request.Content.Subscription subscription;
private Throwable failure;
protected HttpSender(HttpChannel channel)
{
@ -78,13 +78,6 @@ public abstract class HttpSender
public void send(HttpExchange exchange)
{
Request request = exchange.getRequest();
Request.Content body = request.getBody();
consumer.exchange = exchange;
consumer.expect100 = expects100Continue(request);
subscription = body.subscribe(consumer, !consumer.expect100);
if (!queuedToBegin(exchange))
return;
@ -110,10 +103,16 @@ public abstract class HttpSender
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
Request.Content body = request.getBody();
consumer.exchange = exchange;
consumer.expect100 = expects100Continue(request);
subscription = body.subscribe(consumer, !consumer.expect100);
if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
return true;
terminateRequest(exchange);
abortRequest(exchange);
return false;
}
@ -131,7 +130,7 @@ public abstract class HttpSender
if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
return true;
terminateRequest(exchange);
abortRequest(exchange);
return false;
}
@ -149,7 +148,7 @@ public abstract class HttpSender
if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
return true;
terminateRequest(exchange);
abortRequest(exchange);
return false;
}
@ -173,7 +172,7 @@ public abstract class HttpSender
if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
return true;
terminateRequest(exchange);
abortRequest(exchange);
return false;
}
default:
@ -264,13 +263,23 @@ public abstract class HttpSender
}
}
private void terminateRequest(HttpExchange exchange)
private void abortRequest(HttpExchange exchange)
{
// In abort(), the state is updated before the failure is recorded
// to avoid to overwrite it, so here we may read a null failure.
Throwable failure = this.failure;
if (failure == null)
failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
Throwable failure = this.failure.get();
if (subscription != null)
subscription.fail(failure);
dispose();
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request abort {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
}
@ -353,8 +362,11 @@ public abstract class HttpSender
public boolean abort(HttpExchange exchange, Throwable failure)
{
// Store only the first failure.
this.failure.compareAndSet(null, failure);
// Update the state to avoid more request processing.
boolean terminate;
boolean abort;
while (true)
{
RequestState current = requestState.get();
@ -366,28 +378,15 @@ public abstract class HttpSender
{
if (updateRequestState(current, RequestState.FAILURE))
{
terminate = current != RequestState.TRANSIENT;
abort = current != RequestState.TRANSIENT;
break;
}
}
}
this.failure = failure;
dispose();
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request abort {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
if (terminate)
if (abort)
{
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
abortRequest(exchange);
return true;
}
else

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.util;
import java.io.EOFException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.api.Request;
@ -38,7 +37,6 @@ public abstract class AbstractRequestContent implements Request.Content
private final AutoLock lock = new AutoLock();
private final String contentType;
private Subscription subscription;
private Throwable failure;
protected AbstractRequestContent(String contentType)
@ -55,20 +53,15 @@ public abstract class AbstractRequestContent implements Request.Content
@Override
public Subscription subscribe(Consumer consumer, boolean emitInitialContent)
{
Subscription oldSubscription;
Subscription newSubscription;
Throwable failure;
try (AutoLock ignored = lock.lock())
{
if (subscription != null && !isReproducible())
throw new IllegalStateException("Multiple subscriptions not supported on " + this);
oldSubscription = subscription;
newSubscription = subscription = newSubscription(consumer, emitInitialContent, failure);
failure = this.failure;
}
if (oldSubscription != null)
oldSubscription.fail(new EOFException("Content replay"));
Subscription subscription = newSubscription(consumer, emitInitialContent, failure);
if (LOG.isDebugEnabled())
LOG.debug("Content subscription for {}: {}", this, consumer);
return newSubscription;
LOG.debug("Content subscription for {}: {}", subscription, consumer);
return subscription;
}
protected abstract Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure);
@ -76,17 +69,11 @@ public abstract class AbstractRequestContent implements Request.Content
@Override
public void fail(Throwable failure)
{
Subscription subscription = null;
try (AutoLock ignored = lock.lock())
{
if (this.failure == null)
{
this.failure = failure;
subscription = this.subscription;
}
}
if (subscription != null)
subscription.fail(failure);
}
/**

View File

@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpField;
@ -115,7 +116,8 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
{
length = calculateLength();
if (closed)
length = calculateLength();
return new SubscriptionImpl(consumer, emitInitialContent, failure);
}
@ -367,7 +369,10 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
@Override
public void onFailure(Throwable failure)
{
parts.stream()
if (subscription != null)
subscription.fail(failure);
IntStream.range(index, parts.size())
.mapToObj(parts::get)
.map(part -> part.content)
.forEach(content -> content.fail(failure));
}

View File

@ -23,9 +23,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -39,6 +39,7 @@ import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -56,21 +57,23 @@ public class RequestContentBehaviorTest
@BeforeAll
public static void prepare() throws IOException
{
emptyFile = MavenTestingUtils.getTargetTestingPath().resolve("empty.txt");
Files.newOutputStream(emptyFile, StandardOpenOption.CREATE).close();
smallFile = MavenTestingUtils.getTargetTestingPath().resolve("small.txt");
try (var s = Files.newOutputStream(smallFile, StandardOpenOption.CREATE))
{
byte[] bytes = new byte[64];
Arrays.fill(bytes, (byte)'#');
s.write(bytes);
}
Path testPath = MavenTestingUtils.getTargetTestingPath();
Files.createDirectories(testPath);
emptyFile = testPath.resolve("empty.txt");
Files.write(emptyFile, new byte[0]);
smallFile = testPath.resolve("small.txt");
byte[] bytes = new byte[64];
Arrays.fill(bytes, (byte)'#');
Files.write(smallFile, bytes);
}
@AfterAll
public static void dispose() throws IOException
{
Files.delete(emptyFile);
if (smallFile != null)
Files.delete(smallFile);
if (emptyFile != null)
Files.delete(emptyFile);
}
public static List<Request.Content> emptyContents() throws IOException
@ -290,7 +293,7 @@ public class RequestContentBehaviorTest
assertEquals(1, notified.get());
content.fail(testFailure);
subscription.fail(testFailure);
subscription.demand();
assertEquals(1, notified.get());
@ -339,4 +342,34 @@ public class RequestContentBehaviorTest
assertNotNull(failure);
assertEquals(1, failure.getSuppressed().length);
}
@Test
public void testSameContentMultipleSubscriptions() throws Exception
{
byte[] bytes = new byte[64];
new Random().nextBytes(bytes);
Request.Content content = new BytesRequestContent(bytes);
CountDownLatch latch1 = new CountDownLatch(1);
Request.Content.Subscription subscription1 = content.subscribe((buffer, last, callback) ->
{
if (last)
latch1.countDown();
}, true);
CountDownLatch latch2 = new CountDownLatch(1);
Request.Content.Subscription subscription2 = content.subscribe((buffer, last, callback) ->
{
if (last)
latch2.countDown();
}, true);
// Initial demand.
subscription1.demand();
assertTrue(latch1.await(5, TimeUnit.SECONDS));
// Initial demand.
subscription2.demand();
assertTrue(latch2.await(5, TimeUnit.SECONDS));
}
}