Async exec runtime and connection management improvements

This commit is contained in:
Oleg Kalnichevski 2017-11-12 14:40:45 +01:00
parent d992bec7ad
commit 79c153409b
4 changed files with 109 additions and 111 deletions

View File

@ -58,9 +58,9 @@ public interface AsyncExecRuntime {
void discardConnection();
boolean isConnected();
boolean validateConnection();
void disconnect();
boolean isConnected();
void connect(
HttpClientContext clientContext,
@ -75,16 +75,8 @@ public interface AsyncExecRuntime {
AsyncClientExchangeHandler exchangeHandler,
HttpClientContext context);
boolean validateConnection();
boolean isConnectionReusable();
void markConnectionReusable();
void markConnectionReusable(Object state, TimeValue duration);
void markConnectionNonReusable();
void setConnectionState(Object state);
void setConnectionValidFor(TimeValue duration);
}

View File

@ -108,6 +108,21 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
}
}
private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
try {
endpoint.shutdown();
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": discarding endpoint");
}
} catch (final IOException ex) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
}
} finally {
manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
}
}
@Override
public void releaseConnection() {
final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
@ -118,18 +133,7 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
}
manager.release(endpoint, state, validDuration);
} else {
try {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": releasing invalid endpoint");
}
endpoint.close();
} catch (final IOException ex) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
}
} finally {
manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
}
discardEndpoint(endpoint);
}
}
}
@ -138,19 +142,22 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
public void discardConnection() {
final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
if (endpoint != null) {
try {
endpoint.shutdown();
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": discarding endpoint");
}
} catch (final IOException ex) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
}
} finally {
manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
discardEndpoint(endpoint);
}
}
@Override
public boolean validateConnection() {
if (reusable) {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
return endpoint != null && endpoint.isConnected();
} else {
final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
if (endpoint != null) {
discardEndpoint(endpoint);
}
}
return false;
}
AsyncConnectionEndpoint ensureValid() {
@ -167,22 +174,6 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
return endpoint != null && endpoint.isConnected();
}
@Override
public void disconnect() {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
if (endpoint != null) {
try {
endpoint.close();
} catch (final IOException ex) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
}
discardConnection();
}
}
}
@Override
public void connect(
final HttpClientContext context,
@ -269,34 +260,17 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
}
@Override
public boolean validateConnection() {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
return endpoint != null && endpoint.isConnected();
}
@Override
public boolean isConnectionReusable() {
return reusable;
}
@Override
public void markConnectionReusable() {
public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
reusable = true;
state = newState;
validDuration = newValidDuration;
}
@Override
public void markConnectionNonReusable() {
reusable = false;
}
@Override
public void setConnectionState(final Object state) {
this.state = state;
}
@Override
public void setConnectionValidFor(final TimeValue duration) {
validDuration = duration;
state = null;
validDuration = null;
}
}

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
@ -45,6 +46,7 @@ import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
@ -84,6 +86,7 @@ class AsyncMainClientExec implements AsyncExecChainHandler {
log.debug(exchangeId + ": executing " + new RequestLine(request));
}
final AtomicInteger messageCountDown = new AtomicInteger(2);
final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>(null);
@ -110,6 +113,9 @@ class AsyncMainClientExec implements AsyncExecChainHandler {
@Override
public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
channel.sendRequest(request, entityProducer);
if (entityProducer == null) {
messageCountDown.decrementAndGet();
}
}
@Override
@ -119,7 +125,35 @@ class AsyncMainClientExec implements AsyncExecChainHandler {
@Override
public void produce(final DataStreamChannel channel) throws IOException {
entityProducer.produce(channel);
entityProducer.produce(new DataStreamChannel() {
@Override
public void requestOutput() {
channel.requestOutput();
}
@Override
public int write(final ByteBuffer src) throws IOException {
return channel.write(src);
}
@Override
public void endStream(final List<? extends Header> trailers) throws IOException {
channel.endStream(trailers);
if (messageCountDown.decrementAndGet() <= 0) {
asyncExecCallback.completed();
}
}
@Override
public void endStream() throws IOException {
channel.endStream();
if (messageCountDown.decrementAndGet() <= 0) {
asyncExecCallback.completed();
}
}
});
}
@Override
@ -129,24 +163,21 @@ class AsyncMainClientExec implements AsyncExecChainHandler {
@Override
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
execRuntime.markConnectionReusable();
final TimeValue duration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
execRuntime.setConnectionValidFor(duration);
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
messageCountDown.decrementAndGet();
}
final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
Object userToken = clientContext.getUserToken();
if (userToken == null) {
userToken = userTokenHandler.getUserToken(route, clientContext);
clientContext.setAttribute(HttpClientContext.USER_TOKEN, userToken);
}
if (userToken != null) {
execRuntime.setConnectionState(userToken);
}
execRuntime.markConnectionReusable(userToken, keepAliveDuration);
if (entityDetails == null) {
if (!execRuntime.isConnectionReusable()) {
execRuntime.discardConnection();
} else {
execRuntime.validateConnection();
execRuntime.validateConnection();
if (messageCountDown.decrementAndGet() <= 0) {
asyncExecCallback.completed();
}
asyncExecCallback.completed();
}
}
@ -174,17 +205,13 @@ class AsyncMainClientExec implements AsyncExecChainHandler {
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
// Release connection early
execRuntime.releaseConnection();
entityConsumer.streamEnd(trailers);
} else {
if (!execRuntime.isConnectionReusable()) {
execRuntime.discardConnection();
} else {
execRuntime.validateConnection();
}
execRuntime.validateConnection();
}
if (messageCountDown.decrementAndGet() <= 0) {
asyncExecCallback.completed();
}
asyncExecCallback.completed();
}
};

View File

@ -33,6 +33,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
@ -183,6 +184,7 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
setupContext(clientContext);
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
final AtomicReference<T> resultRef = new AtomicReference<>(null);
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
execChain.execute(
RequestCopier.INSTANCE.copy(request),
@ -255,24 +257,26 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
outputTerminated.set(true);
requestProducer.releaseResources();
}
responseConsumer.consumeResponse(response, entityDetails, new FutureCallback<T>() {
responseConsumer.consumeResponse(response, entityDetails,
//TODO: eliminate this callback after upgrade to HttpCore 5.0b2
new FutureCallback<T>() {
@Override
public void completed(final T result) {
future.completed(result);
}
@Override
public void completed(final T result) {
resultRef.set(result);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
@Override
public void cancelled() {
future.cancel();
}
});
});
return responseConsumer;
}
@ -282,10 +286,11 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
log.debug(exchangeId + ": message exchange successfully completed");
}
try {
execRuntime.releaseConnection();
future.completed(resultRef.getAndSet(null));
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
} finally {
execRuntime.releaseConnection();
}
}
@ -295,15 +300,15 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
log.debug(exchangeId + ": request failed: " + cause.getMessage());
}
try {
execRuntime.discardConnection();
responseConsumer.failed(cause);
} finally {
try {
future.failed(cause);
responseConsumer.failed(cause);
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
}
} finally {
execRuntime.discardConnection();
}
}