AsyncConnectionEndpoint to support graceful and immediate close modes

This commit is contained in:
Oleg Kalnichevski 2018-10-17 14:25:20 +02:00
parent a1d6db5ec3
commit c446639820
3 changed files with 22 additions and 34 deletions

View File

@ -27,7 +27,6 @@
package org.apache.hc.client5.http.impl.async; package org.apache.hc.client5.http.impl.async;
import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -45,6 +44,7 @@ import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.ConnectionInitiator; import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -121,14 +121,10 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
private void discardEndpoint(final AsyncConnectionEndpoint endpoint) { private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
try { try {
endpoint.shutdown(); endpoint.close(CloseMode.IMMEDIATE);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": discarding endpoint"); log.debug(ConnPoolSupport.getId(endpoint) + ": discarding endpoint");
} }
} catch (final IOException ex) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": " + ex.getMessage(), ex);
}
} finally { } finally {
manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS); manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
} }

View File

@ -27,7 +27,6 @@
package org.apache.hc.client5.http.impl.nio; package org.apache.hc.client5.http.impl.nio;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -512,24 +511,13 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
} }
@Override @Override
public void shutdown() throws IOException { public void close(final CloseMode closeMode) {
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get(); final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
if (poolEntry != null) { if (poolEntry != null) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(id + ": shutdown " + CloseMode.IMMEDIATE); log.debug(id + ": shutdown " + closeMode);
} }
poolEntry.discardConnection(CloseMode.IMMEDIATE); poolEntry.discardConnection(closeMode);
}
}
@Override
public void close() throws IOException {
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
if (poolEntry != null) {
if (log.isDebugEnabled()) {
log.debug(id + ": shutdown " + CloseMode.GRACEFUL);
}
poolEntry.discardConnection(CloseMode.GRACEFUL);
} }
} }

View File

@ -27,7 +27,6 @@
package org.apache.hc.client5.http.nio; package org.apache.hc.client5.http.nio;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -43,6 +42,8 @@ import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler; import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
/** /**
* Client connection endpoint that can be used to execute message exchanges. * Client connection endpoint that can be used to execute message exchanges.
@ -50,20 +51,29 @@ import org.apache.hc.core5.http.protocol.HttpCoreContext;
* @since 5.0 * @since 5.0
*/ */
@Contract(threading = ThreadingBehavior.SAFE) @Contract(threading = ThreadingBehavior.SAFE)
public abstract class AsyncConnectionEndpoint implements Closeable { public abstract class AsyncConnectionEndpoint implements ModalCloseable {
public abstract void execute( public abstract void execute(
AsyncClientExchangeHandler exchangeHandler, AsyncClientExchangeHandler exchangeHandler,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory, HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context); HttpContext context);
public void execute( public abstract boolean isConnected();
public abstract void setSocketTimeout(int timeout);
@Override
public final void close() throws IOException {
close(CloseMode.GRACEFUL);
}
public final void execute(
final AsyncClientExchangeHandler exchangeHandler, final AsyncClientExchangeHandler exchangeHandler,
final HttpContext context) { final HttpContext context) {
execute(exchangeHandler, null, context); execute(exchangeHandler, null, context);
} }
public <T> Future<T> execute( public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@ -94,7 +104,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
return future; return future;
} }
public <T> Future<T> execute( public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context, final HttpContext context,
@ -102,7 +112,7 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
return execute(requestProducer, responseConsumer, null, context, callback); return execute(requestProducer, responseConsumer, null, context, callback);
} }
public <T> Future<T> execute( public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@ -110,17 +120,11 @@ public abstract class AsyncConnectionEndpoint implements Closeable {
return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback); return execute(requestProducer, responseConsumer, pushHandlerFactory, null, callback);
} }
public <T> Future<T> execute( public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) { final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, null, callback); return execute(requestProducer, responseConsumer, null, null, callback);
} }
public abstract boolean isConnected();
public abstract void setSocketTimeout(int timeout);
public abstract void shutdown() throws IOException;
} }