Issue #4400 - Review HttpClient's ContentProvider.

Review updates.
* Updated AbstractRequestContent (and subclasses) failure handling.
* Updated MultiPartRequestContent failure handling.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-03-27 15:33:21 +01:00
parent 0f2ddc8c9f
commit 708115f609
10 changed files with 62 additions and 99 deletions

View File

@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
/**
* <p>Partial implementation of {@link Request.Content}.</p>
* <p>Manages a single subscription at a time (multiple simultaneous subscriptions are not allowed).</p>
*/
public abstract class AbstractRequestContent implements Request.Content
{
@ -37,7 +36,6 @@ public abstract class AbstractRequestContent implements Request.Content
private final AutoLock lock = new AutoLock();
private final String contentType;
private Throwable failure;
protected AbstractRequestContent(String contentType)
{
@ -53,28 +51,13 @@ public abstract class AbstractRequestContent implements Request.Content
@Override
public Subscription subscribe(Consumer consumer, boolean emitInitialContent)
{
Throwable failure;
try (AutoLock ignored = lock.lock())
{
failure = this.failure;
}
Subscription subscription = newSubscription(consumer, emitInitialContent, failure);
Subscription subscription = newSubscription(consumer, emitInitialContent);
if (LOG.isDebugEnabled())
LOG.debug("Content subscription for {}: {}", subscription, consumer);
return subscription;
}
protected abstract Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure);
@Override
public void fail(Throwable failure)
{
try (AutoLock ignored = lock.lock())
{
if (this.failure == null)
this.failure = failure;
}
}
protected abstract Subscription newSubscription(Consumer consumer, boolean emitInitialContent);
/**
* <p>Partial implementation of {@code Subscription}.</p>
@ -91,11 +74,10 @@ public abstract class AbstractRequestContent implements Request.Content
// Whether the first content has been produced.
private boolean committed;
public AbstractSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
public AbstractSubscription(Consumer consumer, boolean emitInitialContent)
{
this.consumer = consumer;
this.emitInitialContent = emitInitialContent;
this.failure = failure;
this.stalled = true;
}

View File

@ -63,18 +63,18 @@ public class ByteBufferRequestContent extends AbstractRequestContent
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new SubscriptionImpl(consumer, emitInitialContent, failure);
return new SubscriptionImpl(consumer, emitInitialContent);
}
private class SubscriptionImpl extends AbstractSubscription
{
private int index;
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override

View File

@ -60,18 +60,18 @@ public class BytesRequestContent extends AbstractRequestContent
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new SubscriptionImpl(consumer, emitInitialContent, failure);
return new SubscriptionImpl(consumer, emitInitialContent);
}
private class SubscriptionImpl extends AbstractSubscription
{
private int index;
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override

View File

@ -42,6 +42,7 @@ public class InputStreamRequestContent extends AbstractRequestContent
private final InputStream stream;
private final int bufferSize;
private Subscription subscription;
public InputStreamRequestContent(InputStream stream)
{
@ -66,9 +67,11 @@ public class InputStreamRequestContent extends AbstractRequestContent
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new SubscriptionImpl(consumer, emitInitialContent, failure);
if (subscription != null)
throw new IllegalStateException("Multiple subscriptions not supported on " + this);
return subscription = new SubscriptionImpl(consumer, emitInitialContent);
}
@Override
@ -96,9 +99,9 @@ public class InputStreamRequestContent extends AbstractRequestContent
{
private boolean terminated;
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override

View File

@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpField;
@ -86,7 +85,8 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
private final ByteBuffer onlyBoundary;
private final ByteBuffer lastBoundary;
private long length;
private volatile boolean closed;
private boolean closed;
private Subscription subscription;
public MultiPartRequestContent()
{
@ -114,11 +114,22 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
if (closed)
length = calculateLength();
return new SubscriptionImpl(consumer, emitInitialContent, failure);
if (!closed)
throw new IllegalStateException("MultiPartRequestContent must be closed before sending the request");
if (subscription != null)
throw new IllegalStateException("Multiple subscriptions not supported on " + this);
length = calculateLength();
return subscription = new SubscriptionImpl(consumer, emitInitialContent);
}
@Override
public void fail(Throwable failure)
{
parts.stream()
.map(part -> part.content)
.forEach(content -> content.fail(failure));
}
/**
@ -284,9 +295,9 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
private int index;
private Subscription subscription;
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override
@ -371,10 +382,6 @@ public class MultiPartRequestContent extends AbstractRequestContent implements C
{
if (subscription != null)
subscription.fail(failure);
IntStream.range(index, parts.size())
.mapToObj(parts::get)
.map(part -> part.content)
.forEach(content -> content.fail(failure));
}
}

View File

@ -114,9 +114,9 @@ public class PathRequestContent extends AbstractRequestContent
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new SubscriptionImpl(consumer, emitInitialContent, failure);
return new SubscriptionImpl(consumer, emitInitialContent);
}
private class SubscriptionImpl extends AbstractSubscription
@ -124,9 +124,9 @@ public class PathRequestContent extends AbstractRequestContent
private ReadableByteChannel channel;
private long readTotal;
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
private SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override
@ -166,5 +166,12 @@ public class PathRequestContent extends AbstractRequestContent
if (bufferPool != null)
bufferPool.release(buffer);
}
@Override
public void fail(Throwable failure)
{
super.fail(failure);
IO.close(channel);
}
}
}

View File

@ -819,18 +819,18 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
}
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new SubscriptionImpl(consumer, emitInitialContent, failure);
return new SubscriptionImpl(consumer, emitInitialContent);
}
private class SubscriptionImpl extends AbstractSubscription
{
private int index;
public SubscriptionImpl(Consumer consumer, boolean emitInitialContent, Throwable failure)
public SubscriptionImpl(Consumer consumer, boolean emitInitialContent)
{
super(consumer, emitInitialContent, failure);
super(consumer, emitInitialContent);
}
@Override

View File

@ -631,9 +631,9 @@ public class HttpClientTest extends AbstractHttpClientServerTest
.body(new AbstractRequestContent("application/octet-stream")
{
@Override
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent, Throwable failure)
protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent)
{
return new AbstractSubscription(consumer, emitInitialContent, failure)
return new AbstractSubscription(consumer, emitInitialContent)
{
private int count;

View File

@ -408,6 +408,8 @@ public class MultiPartContentTest extends AbstractHttpClientServerTest
multiPart.addFieldPart("field", fieldContent, null);
AsyncRequestContent fileContent = new AsyncRequestContent();
multiPart.addFilePart("file", "fileName", fileContent, null);
multiPart.close();
CountDownLatch responseLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
@ -432,8 +434,6 @@ public class MultiPartContentTest extends AbstractHttpClientServerTest
fieldContent.offer(encoding.encode(value));
fieldContent.close();
multiPart.close();
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}

View File

@ -25,7 +25,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -39,7 +38,6 @@ import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -48,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class RequestContentBehaviorTest
{
@ -171,6 +170,7 @@ public class RequestContentBehaviorTest
{
{
addFieldPart("field", new StringRequestContent("*".repeat(64)), null);
close();
}
},
new PathRequestContent(smallFile),
@ -229,41 +229,6 @@ public class RequestContentBehaviorTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@MethodSource("smallContents")
public void testSmallContentFailedBeforeSubscription(Request.Content content)
{
Throwable testFailure = new Throwable("test_failure");
content.fail(testFailure);
AtomicInteger notified = new AtomicInteger();
AtomicReference<Throwable> failureRef = new AtomicReference<>();
Request.Content.Subscription subscription = content.subscribe(new Request.Content.Consumer()
{
@Override
public void onContent(ByteBuffer buffer, boolean last, Callback callback)
{
notified.getAndIncrement();
}
@Override
public void onFailure(Throwable error)
{
testFailure.addSuppressed(new Throwable("suppressed"));
failureRef.compareAndSet(null, error);
}
}, true);
// Initial demand.
subscription.demand();
assertEquals(0, notified.get());
Throwable failure = failureRef.get();
assertNotNull(failure);
assertSame(testFailure, failure);
assertEquals(1, failure.getSuppressed().length);
}
@ParameterizedTest
@MethodSource("smallContents")
public void testSmallContentFailedAfterFirstDemand(Request.Content content)
@ -343,12 +308,11 @@ public class RequestContentBehaviorTest
assertEquals(1, failure.getSuppressed().length);
}
@Test
public void testSameContentMultipleSubscriptions() throws Exception
@ParameterizedTest
@MethodSource("smallContents")
public void testReproducibleContentCanHaveMultipleSubscriptions(Request.Content content) throws Exception
{
byte[] bytes = new byte[64];
new Random().nextBytes(bytes);
Request.Content content = new BytesRequestContent(bytes);
assumeTrue(content.isReproducible());
CountDownLatch latch1 = new CountDownLatch(1);
Request.Content.Subscription subscription1 = content.subscribe((buffer, last, callback) ->