HTTPASYNC-160: HttpAsyncClient in INACTIVE or STOPPED state throws a IllegalStateException causing the current thread to terminate

This commit is contained in:
Oleg Kalnichevski 2020-04-18 12:08:10 +02:00
parent 12ec6f15ea
commit 177fc804e5
4 changed files with 24 additions and 17 deletions

View File

@ -81,13 +81,8 @@ abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
pushConsumerRegistry.register(hostname, uriPattern, supplier); pushConsumerRegistry.register(hostname, uriPattern, supplier);
} }
void ensureRunning() { boolean isRunning() {
switch (status.get()) { return status.get() == Status.RUNNING;
case READY:
throw new IllegalStateException("Client is not running");
case TERMINATED:
throw new IllegalStateException("Client has been terminated");
}
} }
ConnectionInitiator getConnectionInitiator() { ConnectionInitiator getConnectionInitiator() {

View File

@ -30,6 +30,7 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -150,9 +151,11 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context, final HttpContext context,
final FutureCallback<T> callback) { final FutureCallback<T> callback) {
ensureRunning();
final ComplexFuture<T> future = new ComplexFuture<>(callback); final ComplexFuture<T> future = new ComplexFuture<>(callback);
try { try {
if (!isRunning()) {
throw new CancellationException("Request execution cancelled");
}
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create(); final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
requestProducer.sendRequest(new RequestChannel() { requestProducer.sendRequest(new RequestChannel() {
@ -316,7 +319,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
} }
}, context); }, context);
} catch (final HttpException | IOException ex) { } catch (final HttpException | IOException | IllegalStateException ex) {
future.failed(ex); future.failed(ex);
} }
return future; return future;

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -134,10 +135,12 @@ public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBa
final AsyncClientExchangeHandler exchangeHandler, final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) { final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable(); final ComplexCancellable cancellable = new ComplexCancellable();
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
try { try {
if (!isRunning()) {
throw new CancellationException("Request execution cancelled");
}
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
exchangeHandler.produceRequest(new RequestChannel() { exchangeHandler.produceRequest(new RequestChannel() {
@Override @Override
@ -270,7 +273,7 @@ public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBa
} }
}, context); }, context);
} catch (final HttpException | IOException ex) { } catch (final HttpException | IOException | IllegalStateException ex) {
exchangeHandler.failed(ex); exchangeHandler.failed(ex);
} }
return cancellable; return cancellable;

View File

@ -29,6 +29,7 @@ package org.apache.hc.client5.http.impl.async;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -209,12 +210,15 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
final FutureCallback<AsyncClientEndpoint> callback) { final FutureCallback<AsyncClientEndpoint> callback) {
Args.notNull(host, "Host"); Args.notNull(host, "Host");
Args.notNull(context, "HTTP context"); Args.notNull(context, "HTTP context");
ensureRunning(); final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
if (!isRunning()) {
future.failed(new CancellationException("Connection lease cancelled"));
return future;
}
final HttpClientContext clientContext = HttpClientContext.adapt(context); final HttpClientContext clientContext = HttpClientContext.adapt(context);
final RequestConfig requestConfig = clientContext.getRequestConfig(); final RequestConfig requestConfig = clientContext.getRequestConfig();
final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout(); final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
final Timeout connectTimeout = requestConfig.getConnectTimeout(); final Timeout connectTimeout = requestConfig.getConnectTimeout();
final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
leaseEndpoint( leaseEndpoint(
host, host,
connectionRequestTimeout, connectionRequestTimeout,
@ -246,10 +250,12 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
final AsyncClientExchangeHandler exchangeHandler, final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) { final HttpContext context) {
ensureRunning();
final ComplexCancellable cancellable = new ComplexCancellable(); final ComplexCancellable cancellable = new ComplexCancellable();
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
try { try {
if (!isRunning()) {
throw new CancellationException("Request execution cancelled");
}
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
exchangeHandler.produceRequest(new RequestChannel() { exchangeHandler.produceRequest(new RequestChannel() {
@Override @Override
@ -426,7 +432,7 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
} }
}, context); }, context);
} catch (final HttpException | IOException ex) { } catch (final HttpException | IOException | IllegalStateException ex) {
exchangeHandler.failed(ex); exchangeHandler.failed(ex);
} }
return cancellable; return cancellable;