Improved internal state representation of the internal async execution runtime in order to present potential race conditions

This commit is contained in:
Oleg Kalnichevski 2025-01-09 22:09:05 +01:00
parent ff87bf0075
commit 2f6694fcf3
1 changed files with 20 additions and 15 deletions

View File

@ -54,6 +54,18 @@ import org.slf4j.Logger;
class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
static class ReUseData {
final Object state;
final TimeValue validDuration;
ReUseData(final Object state, final TimeValue validDuration) {
this.state = state;
this.validDuration = validDuration;
}
}
private final Logger log;
private final AsyncClientConnectionManager manager;
private final ConnectionInitiator connectionInitiator;
@ -64,9 +76,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Deprecated
private final TlsConfig tlsConfig;
private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
private volatile boolean reusable;
private volatile Object state;
private volatile TimeValue validDuration;
private final AtomicReference<ReUseData> reuseDataRef;
InternalHttpAsyncExecRuntime(
final Logger log,
@ -81,7 +91,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
this.pushHandlerFactory = pushHandlerFactory;
this.tlsConfig = tlsConfig;
this.endpointRef = new AtomicReference<>();
this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
this.reuseDataRef = new AtomicReference<>();
}
@Override
@ -97,7 +107,6 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
final HttpClientContext context,
final FutureCallback<AsyncExecRuntime> callback) {
if (endpointRef.get() == null) {
state = object;
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
if (log.isDebugEnabled()) {
@ -113,7 +122,6 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
endpointRef.set(connectionEndpoint);
reusable = connectionEndpoint.isConnected();
if (log.isDebugEnabled()) {
log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
}
@ -153,11 +161,12 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
public void releaseEndpoint() {
final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
if (endpoint != null) {
if (reusable) {
final ReUseData reUseData = reuseDataRef.getAndSet(null);
if (reUseData != null) {
if (log.isDebugEnabled()) {
log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
}
manager.release(endpoint, state, validDuration);
manager.release(endpoint, reUseData.state, reUseData.validDuration);
} else {
discardEndpoint(endpoint);
}
@ -174,7 +183,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public boolean validateConnection() {
if (reusable) {
if (reuseDataRef != null) {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
return endpoint != null && endpoint.isConnected();
}
@ -325,16 +334,12 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
reusable = true;
state = newState;
validDuration = newValidDuration;
reuseDataRef.set(new ReUseData(newState, newValidDuration));
}
@Override
public void markConnectionNonReusable() {
reusable = false;
state = null;
validDuration = null;
reuseDataRef.set(null);
}
@Override