Always bind the exchange ID to the execution context

This commit is contained in:
Oleg Kalnichevski 2021-09-27 14:35:24 +02:00
parent 013851d898
commit 5390aef223
7 changed files with 14 additions and 4 deletions

View File

@ -656,6 +656,7 @@ private void handleCacheHit(
try { try {
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now); final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, entry, now);
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
context.setExchangeId(exchangeId);
final AsyncExecChain.Scope fork = new AsyncExecChain.Scope( final AsyncExecChain.Scope fork = new AsyncExecChain.Scope(
exchangeId, exchangeId,
scope.route, scope.route,

View File

@ -278,6 +278,7 @@ private ClassicHttpResponse handleCacheHit(
&& validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) { && validityPolicy.mayReturnStaleWhileRevalidating(entry, now)) {
LOG.debug("Serving stale with asynchronous revalidation"); LOG.debug("Serving stale with asynchronous revalidation");
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
context.setExchangeId(exchangeId);
final ExecChain.Scope fork = new ExecChain.Scope( final ExecChain.Scope fork = new ExecChain.Scope(
exchangeId, exchangeId,
scope.route, scope.route,

View File

@ -188,12 +188,12 @@ protected <T> Future<T> doExecute(
httpHost != null ? httpHost : RoutingSupport.determineHost(request), httpHost != null ? httpHost : RoutingSupport.determineHost(request),
clientContext); clientContext);
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
clientContext.setExchangeId(exchangeId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} preparing request execution", exchangeId); LOG.debug("{} preparing request execution", exchangeId);
} }
final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory); final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
clientContext.setExchangeId(exchangeId);
setupContext(clientContext); setupContext(clientContext);
final AsyncExecChain.Scheduler scheduler = this::executeScheduled; final AsyncExecChain.Scheduler scheduler = this::executeScheduled;

View File

@ -213,7 +213,10 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio
}; };
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session)); clientContext.setExchangeId(exchangeId);
if (LOG.isDebugEnabled()) {
LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(session));
}
session.enqueue( session.enqueue(
new RequestExecutionCommand( new RequestExecutionCommand(
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler), new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),

View File

@ -136,6 +136,7 @@ private Future<AsyncConnectionEndpoint> leaseEndpoint(
final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver)); final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback); final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
clientContext.setExchangeId(exchangeId);
final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease( final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
exchangeId, exchangeId,
route, route,
@ -445,16 +446,18 @@ public void execute(
final HttpContext context) { final HttpContext context) {
Asserts.check(!released.get(), "Endpoint has already been released"); Asserts.check(!released.get(), "Endpoint has already been released");
final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
clientContext.setExchangeId(exchangeId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint)); LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
connectionEndpoint.execute( connectionEndpoint.execute(
exchangeId, exchangeId,
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler), new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
pushHandlerFactory, pushHandlerFactory,
context); clientContext);
} else { } else {
connectionEndpoint.execute(exchangeId, exchangeHandler, context); connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
} }
} }

View File

@ -159,6 +159,7 @@ protected CloseableHttpResponse doExecute(
target != null ? target : RoutingSupport.determineHost(request), target != null ? target : RoutingSupport.determineHost(request),
localcontext); localcontext);
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
localcontext.setExchangeId(exchangeId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} preparing request execution", exchangeId); LOG.debug("{} preparing request execution", exchangeId);
} }

View File

@ -132,6 +132,7 @@ protected CloseableHttpResponse doExecute(
final HttpRoute route = new HttpRoute(RoutingSupport.normalize(target, schemePortResolver)); final HttpRoute route = new HttpRoute(RoutingSupport.normalize(target, schemePortResolver));
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
clientContext.setExchangeId(exchangeId);
final ExecRuntime execRuntime = new InternalExecRuntime(LOG, connManager, requestExecutor, final ExecRuntime execRuntime = new InternalExecRuntime(LOG, connManager, requestExecutor,
request instanceof CancellableDependency ? (CancellableDependency) request : null); request instanceof CancellableDependency ? (CancellableDependency) request : null);
try { try {