Better HTTP execution context management by caching protocol handlers

This commit is contained in:
Oleg Kalnichevski 2023-11-11 20:23:45 +01:00
parent abb958ec27
commit 628a963a3a
4 changed files with 247 additions and 213 deletions

View File

@ -47,6 +47,7 @@ import org.apache.hc.client5.http.async.methods.SimpleBody;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.cache.CacheResponseStatus;
import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheContext;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.ResourceFactory;
import org.apache.hc.client5.http.cache.ResourceIOException;
@ -106,22 +107,6 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
BasicRequestBuilder.copy(request).build());
}
AsyncCachingExec(
final HttpAsyncCache responseCache,
final CacheValidityPolicy validityPolicy,
final ResponseCachingPolicy responseCachingPolicy,
final CachedHttpResponseGenerator responseGenerator,
final CacheableRequestPolicy cacheableRequestPolicy,
final CachedResponseSuitabilityChecker suitabilityChecker,
final DefaultAsyncCacheRevalidator cacheRevalidator,
final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder,
final CacheConfig config) {
super(validityPolicy, responseCachingPolicy, responseGenerator, cacheableRequestPolicy, suitabilityChecker, config);
this.responseCache = responseCache;
this.cacheRevalidator = cacheRevalidator;
this.conditionalRequestBuilder = conditionalRequestBuilder;
}
AsyncCachingExec(
final HttpAsyncCache cache,
final ScheduledExecutorService executorService,
@ -145,7 +130,6 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final SimpleHttpResponse cacheResponse,
final AsyncExecChain.Scope scope,
final AsyncExecCallback asyncExecCallback) {
scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, cacheResponse);
scope.execRuntime.releaseEndpoint();
final SimpleBody body = cacheResponse.getBody();
@ -213,20 +197,61 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
Args.notNull(scope, "Scope");
final HttpRoute route = scope.route;
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
final URIAuthority authority = request.getAuthority();
final String scheme = request.getScheme();
final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
doExecute(target,
request,
entityProducer,
scope,
chain,
new AsyncExecCallback() {
// default response context
setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public void handleInformationResponse(
final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void completed() {
asyncExecCallback.completed();
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
});
}
public void doExecute(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
final HttpClientContext context = scope.clientContext;
final CancellableDependency operation = scope.cancellableDependency;
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MISS);
if (clientRequestsOurOptions(request)) {
setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
triggerResponse(SimpleHttpResponse.create(HttpStatus.SC_NOT_IMPLEMENTED), scope, asyncExecCallback);
return;
}
@ -596,7 +621,12 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
final HttpClientContext context = scope.clientContext;
recordCacheHit(target, request);
if (LOG.isDebugEnabled()) {
LOG.debug("Request {} {}: cache hit", request.getMethod(), request.getRequestUri());
}
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_HIT);
cacheHits.getAndIncrement();
final Instant now = getCurrentDate();
final CacheSuitability cacheSuitability = suitabilityChecker.assessSuitability(requestCacheControl, responseCacheControl, request, hit.entry, now);
@ -604,17 +634,17 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
LOG.debug("Request {} {}: {}", request.getMethod(), request.getRequestUri(), cacheSuitability);
}
if (cacheSuitability == CacheSuitability.FRESH || cacheSuitability == CacheSuitability.FRESH_ENOUGH) {
LOG.debug("Cache hit");
LOG.debug("Cache hit is suitable");
try {
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, context, hit.entry);
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, hit.entry, now);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
recordCacheFailure(target, request);
if (!mayCallBackend(requestCacheControl)) {
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.FAILURE);
try {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final HttpException | IOException ex2) {
@ -625,7 +655,8 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
} else {
if (!mayCallBackend(requestCacheControl)) {
LOG.debug("Cache entry not is not fresh and only-if-cached requested");
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(context);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else if (cacheSuitability == CacheSuitability.MISMATCH) {
LOG.debug("Cache entry does not match the request; calling backend");
@ -638,39 +669,7 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
callBackend(target, request, entityProducer, scope, chain, asyncExecCallback);
} else if (cacheSuitability == CacheSuitability.REVALIDATION_REQUIRED) {
LOG.debug("Revalidation required; revalidating cache entry");
revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() {
private final AtomicBoolean committed = new AtomicBoolean();
@Override
public AsyncDataConsumer handleResponse(final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
committed.set(true);
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void completed() {
asyncExecCallback.completed();
}
@Override
public void failed(final Exception cause) {
if (!committed.get() && cause instanceof IOException) {
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(scope.clientContext);
LOG.debug(cause.getMessage(), cause);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
asyncExecCallback.failed(cause);
}
}
});
revalidateCacheEntryWithoutFallback(responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
} else if (cacheSuitability == CacheSuitability.STALE_WHILE_REVALIDATED) {
if (cacheRevalidator != null) {
LOG.debug("Serving stale with asynchronous revalidation");
@ -690,7 +689,8 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
hit.getEntryKey(),
asyncExecCallback,
c -> revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, fork, chain, c));
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, context, hit.entry);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, hit.entry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final IOException ex) {
asyncExecCallback.failed(ex);
@ -723,13 +723,13 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
responseCacheControl,
BasicRequestBuilder.copy(scope.originalRequest).build(),
hit.entry);
final HttpClientContext context = scope.clientContext;
chainProceed(conditionalRequest, entityProducer, scope, chain, new AsyncExecCallback() {
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
void triggerUpdatedCacheEntryResponse(final HttpResponse backendResponse, final Instant responseDate) {
final CancellableDependency operation = scope.cancellableDependency;
recordCacheUpdate(scope.clientContext);
operation.setDependency(responseCache.update(
hit,
target,
@ -742,7 +742,7 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
@Override
public void completed(final CacheHit updated) {
try {
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, scope.clientContext, updated.entry);
final SimpleHttpResponse cacheResponse = generateCachedResponse(request, updated.entry, responseDate);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
@ -762,19 +762,11 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
}));
}
void triggerResponseStaleCacheEntry() {
try {
final SimpleHttpResponse cacheResponse = responseGenerator.generateResponse(request, hit.entry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final ResourceIOException ex) {
asyncExecCallback.failed(ex);
}
}
AsyncExecCallback evaluateResponse(final HttpResponse backendResponse, final Instant responseDate) {
final int statusCode = backendResponse.getCode();
if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
recordCacheUpdate(scope.clientContext);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED);
cacheUpdates.getAndIncrement();
}
if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
return new AsyncExecCallbackWrapper(() -> triggerUpdatedCacheEntryResponse(backendResponse, responseDate), asyncExecCallback::failed);
@ -787,7 +779,7 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final HttpResponse backendResponse1,
final EntityDetails entityDetails) throws HttpException, IOException {
final Instant responseDate1 = getCurrentDate();
final Instant responseDate = getCurrentDate();
final AsyncExecCallback callback1;
if (HttpCacheEntry.isNewer(hit.entry, backendResponse1)) {
@ -839,7 +831,7 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
}), asyncExecCallback::failed);
} else {
callback1 = evaluateResponse(backendResponse1, responseDate1);
callback1 = evaluateResponse(backendResponse1, responseDate);
}
callbackRef.set(callback1);
return callback1.handleResponse(backendResponse1, entityDetails);
@ -879,6 +871,52 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
}
void revalidateCacheEntryWithoutFallback(
final ResponseCacheControl responseCacheControl,
final CacheHit hit,
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
final HttpClientContext context = scope.clientContext;
revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() {
private final AtomicBoolean committed = new AtomicBoolean();
@Override
public AsyncDataConsumer handleResponse(final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
committed.set(true);
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void completed() {
asyncExecCallback.completed();
}
@Override
public void failed(final Exception cause) {
if (!committed.get() && cause instanceof IOException) {
LOG.debug(cause.getMessage(), cause);
final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} else {
asyncExecCallback.failed(cause);
}
}
});
}
void revalidateCacheEntryWithFallback(
final RequestCacheControl requestCacheControl,
final ResponseCacheControl responseCacheControl,
@ -889,18 +927,22 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
final HttpClientContext context = scope.clientContext;
revalidateCacheEntry(responseCacheControl, hit, target, request, entityProducer, scope, chain, new AsyncExecCallback() {
private final AtomicBoolean committed = new AtomicBoolean();
private final AtomicReference<HttpResponse> committed = new AtomicReference<>();
@Override
public AsyncDataConsumer handleResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
final int status = response.getCode();
if (staleIfErrorAppliesTo(status) &&
suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Serving stale response due to {} status and stale-if-error enabled", status);
}
return null;
} else {
committed.set(true);
committed.set(response);
return asyncExecCallback.handleResponse(response, entityDetails);
}
}
@ -912,33 +954,41 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
@Override
public void completed() {
if (committed.get()) {
asyncExecCallback.completed();
} else {
final HttpResponse response = committed.get();
if (response == null) {
try {
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, scope.clientContext, hit.entry);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, hit.entry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final IOException ex) {
asyncExecCallback.failed(ex);
}
} else {
asyncExecCallback.completed();
}
}
@Override
public void failed(final Exception cause) {
if (!committed.get() &&
cause instanceof IOException &&
suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
try {
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, scope.clientContext, hit.entry);
final HttpResponse response = committed.get();
if (response == null) {
LOG.debug(cause.getMessage(), cause);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
if (cause instanceof IOException &&
suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
LOG.debug("Serving stale response due to IOException and stale-if-error enabled");
try {
final SimpleHttpResponse cacheResponse = unvalidatedCacheHit(request, hit.entry);
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final IOException ex) {
asyncExecCallback.failed(cause);
}
} else {
final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
triggerResponse(cacheResponse, scope, asyncExecCallback);
} catch (final IOException ex) {
asyncExecCallback.failed(cause);
}
} else {
LOG.debug(cause.getMessage(), cause);
final SimpleHttpResponse cacheResponse = generateGatewayTimeout(scope.clientContext);
triggerResponse(cacheResponse, scope, asyncExecCallback);
asyncExecCallback.failed(cause);
}
}
@ -953,11 +1003,16 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) {
recordCacheMiss(target, request);
cacheMisses.getAndIncrement();
if (LOG.isDebugEnabled()) {
LOG.debug("Request {} {}: cache miss", request.getMethod(), request.getRequestUri());
}
final CancellableDependency operation = scope.cancellableDependency;
if (!mayCallBackend(requestCacheControl)) {
final SimpleHttpResponse cacheResponse = SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
final SimpleHttpResponse cacheResponse = generateGatewayTimeout();
triggerResponse(cacheResponse, scope, asyncExecCallback);
}
@ -1017,7 +1072,10 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
final AtomicReference<AsyncExecCallback> callbackRef = new AtomicReference<>();
void updateVariantCacheEntry(final HttpResponse backendResponse, final Instant responseDate, final CacheHit match) {
recordCacheUpdate(scope.clientContext);
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED);
cacheUpdates.getAndIncrement();
operation.setDependency(responseCache.storeFromNegotiated(
match,
target,
@ -1141,8 +1199,7 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
if (HttpCacheEntry.isNewer(match.entry, backendResponse)) {
final HttpRequest unconditional = conditionalRequestBuilder.buildUnconditionalRequest(
BasicRequestBuilder.copy(request).build());
scope.clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, unconditional);
callback = new AsyncExecCallbackWrapper(() -> callBackend(target, request, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed);
callback = new AsyncExecCallbackWrapper(() -> callBackend(target, unconditional, entityProducer, scope, chain, asyncExecCallback), asyncExecCallback::failed);
} else {
callback = new AsyncExecCallbackWrapper(() -> updateVariantCacheEntry(backendResponse, responseDate, match), asyncExecCallback::failed);
}

View File

@ -39,6 +39,7 @@ import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.methods.SimpleBody;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.cache.CacheResponseStatus;
import org.apache.hc.client5.http.cache.HttpCacheContext;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheStorage;
import org.apache.hc.client5.http.cache.ResourceIOException;
@ -150,18 +151,30 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final HttpRoute route = scope.route;
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpClientContext.HTTP_ROUTE, scope.route);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
final URIAuthority authority = request.getAuthority();
final String scheme = request.getScheme();
final HttpHost target = authority != null ? new HttpHost(scheme, authority) : route.getTargetHost();
final ClassicHttpResponse response = doExecute(target, request, scope, chain);
// default response context
setResponseStatus(context, CacheResponseStatus.CACHE_MISS);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
return response;
}
ClassicHttpResponse doExecute(
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain) throws IOException, HttpException {
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MISS);
if (clientRequestsOurOptions(request)) {
setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return new BasicClassicHttpResponse(HttpStatus.SC_NOT_IMPLEMENTED);
}
final CacheMatch result = responseCache.match(target, request);
@ -177,7 +190,6 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
return callBackend(target, request, scope, chain);
}
if (hit == null) {
LOG.debug("Cache miss");
return handleCacheMiss(requestCacheControl, root, target, request, scope, chain);
@ -190,7 +202,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
}
}
private static ClassicHttpResponse convert(final SimpleHttpResponse cacheResponse, final ExecChain.Scope scope) {
private static ClassicHttpResponse convert(final SimpleHttpResponse cacheResponse) {
if (cacheResponse == null) {
return null;
}
@ -210,7 +222,6 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
response.setEntity(new ByteArrayEntity(body.getBodyBytes(), contentType, contentEncoding, false));
}
}
scope.clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
return response;
}
@ -225,7 +236,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
LOG.debug("Calling the backend");
final ClassicHttpResponse backendResponse = chain.proceed(request, scope);
try {
return handleBackendResponse(target, request, scope, requestDate, getCurrentDate(), backendResponse);
return handleBackendResponse(target, request, requestDate, getCurrentDate(), backendResponse);
} catch (final IOException | RuntimeException ex) {
backendResponse.close();
throw ex;
@ -241,8 +252,12 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final ExecChain.Scope scope,
final ExecChain chain) throws IOException, HttpException {
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
recordCacheHit(target, request);
if (LOG.isDebugEnabled()) {
LOG.debug("Request {} {}: cache hit", request.getMethod(), request.getRequestUri());
}
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_HIT);
cacheHits.getAndIncrement();
final Instant now = getCurrentDate();
final CacheSuitability cacheSuitability = suitabilityChecker.assessSuitability(requestCacheControl, responseCacheControl, request, hit.entry, now);
@ -250,21 +265,22 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
LOG.debug("Request {} {}: {}", request.getMethod(), request.getRequestUri(), cacheSuitability);
}
if (cacheSuitability == CacheSuitability.FRESH || cacheSuitability == CacheSuitability.FRESH_ENOUGH) {
LOG.debug("Cache hit");
LOG.debug("Cache hit is suitable");
try {
return convert(generateCachedResponse(request, context, hit.entry), scope);
return convert(generateCachedResponse(request, hit.entry, now));
} catch (final ResourceIOException ex) {
recordCacheFailure(target, request);
if (!mayCallBackend(requestCacheControl)) {
return convert(generateGatewayTimeout(context), scope);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(generateGatewayTimeout());
}
setResponseStatus(scope.clientContext, CacheResponseStatus.FAILURE);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.FAILURE);
return chain.proceed(request, scope);
}
} else {
if (!mayCallBackend(requestCacheControl)) {
LOG.debug("Cache entry not is not fresh and only-if-cached requested");
return convert(generateGatewayTimeout(context), scope);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(generateGatewayTimeout());
} else if (cacheSuitability == CacheSuitability.MISMATCH) {
LOG.debug("Cache entry does not match the request; calling backend");
return callBackend(target, request, scope, chain);
@ -276,12 +292,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
return callBackend(target, request, scope, chain);
} else if (cacheSuitability == CacheSuitability.REVALIDATION_REQUIRED) {
LOG.debug("Revalidation required; revalidating cache entry");
try {
return revalidateCacheEntry(responseCacheControl, hit, target, request, scope, chain);
} catch (final IOException ex) {
LOG.debug(ex.getMessage(), ex);
return convert(generateGatewayTimeout(scope.clientContext), scope);
}
return revalidateCacheEntryWithoutFallback(responseCacheControl, hit, target, request, scope, chain);
} else if (cacheSuitability == CacheSuitability.STALE_WHILE_REVALIDATED) {
if (cacheRevalidator != null) {
LOG.debug("Serving stale with asynchronous revalidation");
@ -296,8 +307,8 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
cacheRevalidator.revalidateCacheEntry(
hit.getEntryKey(),
() -> revalidateCacheEntry(responseCacheControl, hit, target, request, fork, chain));
final SimpleHttpResponse response = unvalidatedCacheHit(request, context, hit.entry);
return convert(response, scope);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(unvalidatedCacheHit(request, hit.entry));
} else {
LOG.debug("Revalidating stale cache entry (asynchronous revalidation disabled)");
return revalidateCacheEntryWithFallback(requestCacheControl, responseCacheControl, hit, target, request, scope, chain);
@ -319,6 +330,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain) throws IOException, HttpException {
final HttpClientContext context = scope.clientContext;
Instant requestDate = getCurrentDate();
final ClassicHttpRequest conditionalRequest = conditionalRequestBuilder.buildConditionalRequest(
responseCacheControl, scope.originalRequest, hit.entry);
@ -338,19 +350,37 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final int statusCode = backendResponse.getCode();
if (statusCode == HttpStatus.SC_NOT_MODIFIED || statusCode == HttpStatus.SC_OK) {
recordCacheUpdate(scope.clientContext);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED);
cacheUpdates.getAndIncrement();
}
if (statusCode == HttpStatus.SC_NOT_MODIFIED) {
final CacheHit updated = responseCache.update(hit, target, request, backendResponse, requestDate, responseDate);
return convert(generateCachedResponse(request, scope.clientContext, updated.entry), scope);
return convert(generateCachedResponse(request, updated.entry, responseDate));
}
return handleBackendResponse(target, conditionalRequest, scope, requestDate, responseDate, backendResponse);
return handleBackendResponse(target, conditionalRequest, requestDate, responseDate, backendResponse);
} catch (final IOException | RuntimeException ex) {
backendResponse.close();
throw ex;
}
}
ClassicHttpResponse revalidateCacheEntryWithoutFallback(
final ResponseCacheControl responseCacheControl,
final CacheHit hit,
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain) throws HttpException {
final HttpClientContext context = scope.clientContext;
try {
return revalidateCacheEntry(responseCacheControl, hit, target, request, scope, chain);
} catch (final IOException ex) {
LOG.debug(ex.getMessage(), ex);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(generateGatewayTimeout());
}
}
ClassicHttpResponse revalidateCacheEntryWithFallback(
final RequestCacheControl requestCacheControl,
final ResponseCacheControl responseCacheControl,
@ -364,14 +394,13 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
try {
response = revalidateCacheEntry(responseCacheControl, hit, target, request, scope, chain);
} catch (final IOException ex) {
LOG.debug(ex.getMessage(), ex);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
if (suitabilityChecker.isSuitableIfError(requestCacheControl, responseCacheControl, hit.entry, getCurrentDate())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Serving stale response due to IOException and stale-if-error enabled");
}
return convert(unvalidatedCacheHit(request, context, hit.entry), scope);
LOG.debug("Serving stale response due to IOException and stale-if-error enabled");
return convert(unvalidatedCacheHit(request, hit.entry));
} else {
LOG.debug(ex.getMessage(), ex);
return convert(generateGatewayTimeout(context), scope);
return convert(generateGatewayTimeout());
}
}
final int status = response.getCode();
@ -381,7 +410,8 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
LOG.debug("Serving stale response due to {} status and stale-if-error enabled", status);
}
EntityUtils.consume(response.getEntity());
return convert(unvalidatedCacheHit(request, context, hit.entry), scope);
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(unvalidatedCacheHit(request, hit.entry));
}
return response;
}
@ -389,7 +419,6 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
ClassicHttpResponse handleBackendResponse(
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final Instant requestDate,
final Instant responseDate,
final ClassicHttpResponse backendResponse) throws IOException {
@ -403,7 +432,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final boolean cacheable = responseCachingPolicy.isResponseCacheable(responseCacheControl, request, backendResponse);
if (cacheable) {
storeRequestIfModifiedSinceFor304Response(request, backendResponse);
return cacheAndReturnResponse(target, request, backendResponse, scope, requestDate, responseDate);
return cacheAndReturnResponse(target, request, backendResponse, requestDate, responseDate);
}
LOG.debug("Backend response is not cacheable");
return backendResponse;
@ -413,7 +442,6 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final HttpHost target,
final HttpRequest request,
final ClassicHttpResponse backendResponse,
final ExecChain.Scope scope,
final Instant requestSent,
final Instant responseReceived) throws IOException {
LOG.debug("Caching backend response");
@ -430,7 +458,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
backendResponse,
requestSent,
responseReceived);
return convert(responseGenerator.generateResponse(request, updated.entry), scope);
return convert(responseGenerator.generateResponse(request, updated.entry));
}
}
@ -470,7 +498,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
hit = responseCache.store(target, request, backendResponse, buf, requestSent, responseReceived);
LOG.debug("Backend response successfully cached (freshness check skipped)");
}
return convert(responseGenerator.generateResponse(request, hit.entry), scope);
return convert(responseGenerator.generateResponse(request, hit.entry));
}
private ClassicHttpResponse handleCacheMiss(
@ -480,10 +508,15 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain) throws IOException, HttpException {
recordCacheMiss(target, request);
cacheMisses.getAndIncrement();
if (LOG.isDebugEnabled()) {
LOG.debug("Request {} {}: cache miss", request.getMethod(), request.getRequestUri());
}
final HttpClientContext context = scope.clientContext;
if (!mayCallBackend(requestCacheControl)) {
return new BasicClassicHttpResponse(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.CACHE_MODULE_RESPONSE);
return convert(generateGatewayTimeout());
}
if (partialMatch != null && partialMatch.entry.hasVariants() && request.getEntity() == null) {
final List<CacheHit> variants = responseCache.getVariants(partialMatch);
@ -517,7 +550,7 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
final Instant responseDate = getCurrentDate();
if (backendResponse.getCode() != HttpStatus.SC_NOT_MODIFIED) {
return handleBackendResponse(target, request, scope, requestDate, responseDate, backendResponse);
return handleBackendResponse(target, request, requestDate, responseDate, backendResponse);
} else {
// 304 response are not expected to have an enclosed content body, but still
backendResponse.close();
@ -541,13 +574,15 @@ class CachingExec extends CachingExecBase implements ExecChainHandler {
return callBackend(target, unconditional, scope, chain);
}
recordCacheUpdate(scope.clientContext);
final HttpClientContext context = scope.clientContext;
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, CacheResponseStatus.VALIDATED);
cacheUpdates.getAndIncrement();
final CacheHit hit = responseCache.storeFromNegotiated(match, target, request, backendResponse, requestDate, responseDate);
if (shouldSendNotModifiedResponse(request, hit.entry, Instant.now())) {
return convert(responseGenerator.generateNotModifiedResponse(hit.entry), scope);
if (shouldSendNotModifiedResponse(request, hit.entry, responseDate)) {
return convert(responseGenerator.generateNotModifiedResponse(hit.entry));
} else {
return convert(responseGenerator.generateResponse(request, hit.entry), scope);
return convert(responseGenerator.generateResponse(request, hit.entry));
}
} catch (final IOException | RuntimeException ex) {
backendResponse.close();

View File

@ -31,19 +31,15 @@ import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.cache.CacheResponseStatus;
import org.apache.hc.client5.http.cache.HttpCacheContext;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,57 +113,23 @@ public class CachingExecBase {
return cacheUpdates.get();
}
void recordCacheMiss(final HttpHost target, final HttpRequest request) {
cacheMisses.getAndIncrement();
if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss [host: {}; uri: {}]", target, request.getRequestUri());
}
}
void recordCacheHit(final HttpHost target, final HttpRequest request) {
cacheHits.getAndIncrement();
if (LOG.isDebugEnabled()) {
LOG.debug("Cache hit [host: {}; uri: {}]", target, request.getRequestUri());
}
}
void recordCacheFailure(final HttpHost target, final HttpRequest request) {
cacheMisses.getAndIncrement();
if (LOG.isDebugEnabled()) {
LOG.debug("Cache failure [host: {}; uri: {}]", target, request.getRequestUri());
}
}
void recordCacheUpdate(final HttpContext context) {
cacheUpdates.getAndIncrement();
setResponseStatus(context, CacheResponseStatus.VALIDATED);
}
SimpleHttpResponse generateCachedResponse(
final HttpRequest request,
final HttpContext context,
final HttpCacheEntry entry) throws ResourceIOException {
setResponseStatus(context, CacheResponseStatus.CACHE_HIT);
if (shouldSendNotModifiedResponse(request, entry, Instant.now())) {
final HttpCacheEntry entry,
final Instant now) throws ResourceIOException {
if (shouldSendNotModifiedResponse(request, entry, now)) {
return responseGenerator.generateNotModifiedResponse(entry);
} else {
return responseGenerator.generateResponse(request, entry);
}
}
SimpleHttpResponse generateGatewayTimeout(
final HttpContext context) {
setResponseStatus(context, CacheResponseStatus.CACHE_MODULE_RESPONSE);
SimpleHttpResponse generateGatewayTimeout() {
return SimpleHttpResponse.create(HttpStatus.SC_GATEWAY_TIMEOUT, "Gateway Timeout");
}
SimpleHttpResponse unvalidatedCacheHit(
final HttpRequest request,
final HttpContext context,
final HttpCacheEntry entry) throws IOException {
final SimpleHttpResponse cachedResponse = responseGenerator.generateResponse(request, entry);
setResponseStatus(context, CacheResponseStatus.CACHE_HIT);
return cachedResponse;
SimpleHttpResponse unvalidatedCacheHit(final HttpRequest request, final HttpCacheEntry entry) throws IOException {
return responseGenerator.generateResponse(request, entry);
}
boolean mayCallBackend(final RequestCacheControl requestCacheControl) {
@ -178,12 +140,6 @@ public class CachingExecBase {
return true;
}
void setResponseStatus(final HttpContext context, final CacheResponseStatus value) {
if (context != null) {
context.setAttribute(HttpCacheContext.CACHE_RESPONSE_STATUS, value);
}
}
Instant getCurrentDate() {
return Instant.now();
}

View File

@ -900,8 +900,7 @@ public class TestCachingExecChain {
originResponse.setHeader("Date", DateUtils.formatStandardDate(responseGenerated));
originResponse.setHeader("ETag", "\"etag\"");
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, mockExecRuntime, context);
impl.cacheAndReturnResponse(host, request, originResponse, scope, requestSent, responseReceived);
impl.cacheAndReturnResponse(host, request, originResponse, requestSent, responseReceived);
Mockito.verify(cache, Mockito.never()).store(
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
@ -937,8 +936,7 @@ public class TestCachingExecChain {
Mockito.eq(requestSent),
Mockito.eq(responseReceived))).thenReturn(new CacheHit("key", httpCacheEntry));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, mockExecRuntime, context);
impl.cacheAndReturnResponse(host, request, originResponse, scope, requestSent, responseReceived);
impl.cacheAndReturnResponse(host, request, originResponse, requestSent, responseReceived);
Mockito.verify(mockCache).store(
Mockito.any(),
@ -958,18 +956,6 @@ public class TestCachingExecChain {
Assertions.assertEquals(HttpStatus.SC_GATEWAY_TIMEOUT, resp.getCode());
}
@Test
public void testSetsRouteInContextOnCacheHit() throws Exception {
final ClassicHttpResponse response = HttpTestUtils.make200Response();
response.setHeader("Cache-Control", "max-age=3600");
Mockito.when(mockExecChain.proceed(RequestEquivalent.eq(request), Mockito.any())).thenReturn(response);
final HttpClientContext ctx = HttpClientContext.create();
impl.execute(request, new ExecChain.Scope("test", route, request, mockExecRuntime, context), mockExecChain);
impl.execute(request, new ExecChain.Scope("test", route, request, mockExecRuntime, ctx), mockExecChain);
Assertions.assertEquals(route, ctx.getHttpRoute());
}
@Test
public void testSetsRequestInContextOnCacheHit() throws Exception {
final ClassicHttpResponse response = HttpTestUtils.make200Response();
@ -1288,7 +1274,7 @@ public class TestCachingExecChain {
.thenReturn(new CacheHit("key", cacheEntry));
// Call cacheAndReturnResponse with 304 Not Modified response
final ClassicHttpResponse cachedResponse = impl.cacheAndReturnResponse(host, request, backendResponse, scope, requestSent, responseReceived);
final ClassicHttpResponse cachedResponse = impl.cacheAndReturnResponse(host, request, backendResponse, requestSent, responseReceived);
// Verify cache entry is updated
Mockito.verify(mockCache).update(