Redesigned cache entry re-validation logic; added classic and async implementations of cache re-validators

This commit is contained in:
Oleg Kalnichevski 2018-01-06 11:45:13 +01:00
parent 79b76030fd
commit 7cf4240b3f
9 changed files with 619 additions and 728 deletions

View File

@ -1,169 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.io.IOException;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Class used to represent an asynchronous revalidation event, such as with
* "stale-while-revalidate"
*/
public class AsynchronousValidationRequest implements Runnable {
private final AsynchronousValidator parent;
private final CachingExec cachingExec;
private final HttpHost target;
private final ClassicHttpRequest request;
private final ExecChain.Scope scope;
private final ExecChain chain;
private final HttpCacheEntry cacheEntry;
private final String identifier;
private final int consecutiveFailedAttempts;
private final Logger log = LogManager.getLogger(getClass());
/**
* Used internally by {@link AsynchronousValidator} to schedule a
*/
AsynchronousValidationRequest(
final AsynchronousValidator parent,
final CachingExec cachingExec,
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain,
final HttpCacheEntry cacheEntry,
final String identifier,
final int consecutiveFailedAttempts) {
this.parent = parent;
this.cachingExec = cachingExec;
this.target = target;
this.request = request;
this.scope = scope;
this.chain = chain;
this.cacheEntry = cacheEntry;
this.identifier = identifier;
this.consecutiveFailedAttempts = consecutiveFailedAttempts;
}
@Override
public void run() {
try {
if (revalidateCacheEntry()) {
parent.jobSuccessful(identifier);
} else {
parent.jobFailed(identifier);
}
} finally {
parent.markComplete(identifier);
}
}
/**
* Revalidate the cache entry and return if the operation was successful.
* Success means a connection to the server was established and replay did
* not indicate a server error.
* @return {@code true} if the cache entry was successfully validated;
* otherwise {@code false}
*/
private boolean revalidateCacheEntry() {
try {
try (ClassicHttpResponse httpResponse = cachingExec.revalidateCacheEntry(target, request, scope, chain, cacheEntry)) {
final int statusCode = httpResponse.getCode();
return isNotServerError(statusCode) && isNotStale(httpResponse);
}
} catch (final IOException ioe) {
log.debug("Asynchronous revalidation failed due to I/O error", ioe);
return false;
} catch (final HttpException pe) {
log.error("HTTP protocol exception during asynchronous revalidation", pe);
return false;
} catch (final RuntimeException re) {
log.error("RuntimeException thrown during asynchronous revalidation: " + re);
return false;
}
}
/**
* Return whether the status code indicates a server error or not.
* @param statusCode the status code to be checked
* @return if the status code indicates a server error or not
*/
private boolean isNotServerError(final int statusCode) {
return statusCode < 500;
}
/**
* Try to detect if the returned response is generated from a stale cache entry.
* @param httpResponse the response to be checked
* @return whether the response is stale or not
*/
private boolean isNotStale(final HttpResponse httpResponse) {
final Header[] warnings = httpResponse.getHeaders(HeaderConstants.WARNING);
if (warnings != null)
{
for (final Header warning : warnings)
{
/**
* warn-codes
* 110 = Response is stale
* 111 = Revalidation failed
*/
final String warningValue = warning.getValue();
if (warningValue.startsWith("110") || warningValue.startsWith("111"))
{
return false;
}
}
}
return true;
}
public String getIdentifier() {
return identifier;
}
/**
* The number of consecutively failed revalidation attempts.
* @return the number of consecutively failed revalidation attempts.
*/
public int getConsecutiveFailedAttempts() {
return consecutiveFailedAttempts;
}
}

View File

@ -1,168 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.impl.schedule.ImmediateSchedulingStrategy;
import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Class used for asynchronous revalidations to be used when the "stale-
* while-revalidate" directive is present
*/
class AsynchronousValidator implements Closeable {
private final ScheduledExecutorService executorService;
private final SchedulingStrategy schedulingStrategy;
private final Set<String> queued;
private final CacheKeyGenerator cacheKeyGenerator;
private final ConcurrentCountMap<String> failureCache;
private final Logger log = LogManager.getLogger(getClass());
/**
* Create AsynchronousValidator which will make revalidation requests
* using an {@link ImmediateSchedulingStrategy}. Its thread
* pool will be configured according to the given {@link CacheConfig}.
* @param config specifies thread pool settings. See
* {@link CacheConfig#getAsynchronousWorkersMax()},
* {@link CacheConfig#getAsynchronousWorkersCore()},
* {@link CacheConfig#getAsynchronousWorkerIdleLifetimeSecs()},
* and {@link CacheConfig#getRevalidationQueueSize()}.
*/
public AsynchronousValidator(final CacheConfig config) {
this(new ScheduledThreadPoolExecutor(config.getAsynchronousWorkersCore()), new ImmediateSchedulingStrategy());
}
/**
* Create AsynchronousValidator which will make revalidation requests
* using the supplied {@link SchedulingStrategy}. Closing the validator
* will also close the given schedulingStrategy.
* @param schedulingStrategy used to maintain a pool of worker threads and
* schedules when requests are executed
*/
AsynchronousValidator(final ScheduledExecutorService executorService, final SchedulingStrategy schedulingStrategy) {
this.executorService = executorService;
this.schedulingStrategy = schedulingStrategy;
this.queued = new HashSet<>();
this.cacheKeyGenerator = CacheKeyGenerator.INSTANCE;
this.failureCache = new ConcurrentCountMap<>();
}
/**
* Schedules an asynchronous revalidation
*/
public synchronized void revalidateCacheEntry(
final CachingExec cachingExec,
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain,
final HttpCacheEntry entry) {
// getVariantURI will fall back on getURI if no variants exist
final String cacheKey = cacheKeyGenerator.generateVariantURI(target, request, entry);
if (!queued.contains(cacheKey)) {
final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
final AsynchronousValidationRequest revalidationRequest =
new AsynchronousValidationRequest(
this, cachingExec, target, request, scope, chain, entry, cacheKey, consecutiveFailedAttempts);
final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
try {
executorService.schedule(revalidationRequest, executionTime.getDuration(), executionTime.getTimeUnit());
queued.add(cacheKey);
} catch (final RejectedExecutionException ree) {
log.debug("Revalidation for [" + cacheKey + "] not scheduled: " + ree);
}
}
}
@Override
public void close() throws IOException {
executorService.shutdown();
}
public void awaitTermination(final Timeout timeout) throws InterruptedException {
Args.notNull(timeout, "Timeout");
executorService.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
}
/**
* Removes an identifier from the internal list of revalidation jobs in
* progress. This is meant to be called by
* {@link AsynchronousValidationRequest#run()} once the revalidation is
* complete, using the identifier passed in during constructions.
* @param identifier
*/
synchronized void markComplete(final String identifier) {
queued.remove(identifier);
}
/**
* The revalidation job was successful thus the number of consecutive
* failed attempts will be reset to zero. Should be called by
* {@link AsynchronousValidationRequest#run()}.
* @param identifier the revalidation job's unique identifier
*/
void jobSuccessful(final String identifier) {
failureCache.resetCount(identifier);
}
/**
* The revalidation job did fail and thus the number of consecutive failed
* attempts will be increased. Should be called by
* {@link AsynchronousValidationRequest#run()}.
* @param identifier the revalidation job's unique identifier
*/
void jobFailed(final String identifier) {
failureCache.increaseCount(identifier);
}
Set<String> getScheduledIdentifiers() {
return Collections.unmodifiableSet(queued);
}
}

View File

@ -61,9 +61,9 @@ class CacheInvalidatorBase {
}
static boolean notGetOrHeadRequest(final String method) {
return !(HeaderConstants.GET_METHOD.equals(method) || HeaderConstants.HEAD_METHOD
.equals(method));
return !(HeaderConstants.GET_METHOD.equals(method) || HeaderConstants.HEAD_METHOD.equals(method));
}
private static URI getLocationURI(final URI requestUri, final HttpResponse response, final String headerName) {
final Header h = response.getFirstHeader(headerName);
if (h == null) {

View File

@ -0,0 +1,190 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Abstract cache re-validation class.
*/
class CacheRevalidatorBase implements Closeable {
interface ScheduledExecutor {
Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
void shutdown();
void awaitTermination(final Timeout timeout) throws InterruptedException;
}
public static ScheduledExecutor wrap(final ScheduledThreadPoolExecutor threadPoolExecutor) {
return new ScheduledExecutor() {
@Override
public ScheduledFuture<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
Args.notNull(command, "Runnable");
Args.notNull(timeValue, "Time value");
return threadPoolExecutor.schedule(command, timeValue.getDuration(), timeValue.getTimeUnit());
}
@Override
public void shutdown() {
threadPoolExecutor.shutdown();
}
@Override
public void awaitTermination(final Timeout timeout) throws InterruptedException {
Args.notNull(timeout, "Timeout");
threadPoolExecutor.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
}
};
}
private final ScheduledExecutor scheduledExecutor;
private final SchedulingStrategy schedulingStrategy;
private final Set<String> pendingRequest;
private final ConcurrentCountMap<String> failureCache;
final Logger log = LogManager.getLogger(getClass());
/**
* Create CacheValidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledExecutor}.
*/
public CacheRevalidatorBase(
final ScheduledExecutor scheduledExecutor,
final SchedulingStrategy schedulingStrategy) {
this.scheduledExecutor = scheduledExecutor;
this.schedulingStrategy = schedulingStrategy;
this.pendingRequest = new HashSet<>();
this.failureCache = new ConcurrentCountMap<>();
}
/**
* Create CacheValidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledThreadPoolExecutor}.
*/
public CacheRevalidatorBase(
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
final SchedulingStrategy schedulingStrategy) {
this(wrap(scheduledThreadPoolExecutor), schedulingStrategy);
}
/**
* Schedules an asynchronous re-validation
*/
void scheduleRevalidation(final String cacheKey, final Runnable command) {
synchronized (pendingRequest) {
if (!pendingRequest.contains(cacheKey)) {
final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
try {
scheduledExecutor.schedule(command, executionTime);
pendingRequest.add(cacheKey);
} catch (final RejectedExecutionException ex) {
log.debug("Revalidation of cache entry with key " + cacheKey + "could not be scheduled: " + ex);
}
}
}
}
@Override
public void close() throws IOException {
scheduledExecutor.shutdown();
}
public void awaitTermination(final Timeout timeout) throws InterruptedException {
Args.notNull(timeout, "Timeout");
scheduledExecutor.awaitTermination(timeout);
}
void jobSuccessful(final String identifier) {
failureCache.resetCount(identifier);
synchronized (pendingRequest) {
pendingRequest.remove(identifier);
}
}
void jobFailed(final String identifier) {
failureCache.increaseCount(identifier);
synchronized (pendingRequest) {
pendingRequest.remove(identifier);
}
}
Set<String> getScheduledIdentifiers() {
synchronized (pendingRequest) {
return new HashSet<>(pendingRequest);
}
}
/**
* Determines if the given response is generated from a stale cache entry.
* @param httpResponse the response to be checked
* @return whether the response is stale or not
*/
boolean isStale(final HttpResponse httpResponse) {
for (final Iterator<Header> it = httpResponse.headerIterator(HeaderConstants.WARNING); it.hasNext(); ) {
/*
* warn-codes
* 110 = Response is stale
* 111 = Revalidation failed
*/
final Header warning = it.next();
final String warningValue = warning.getValue();
if (warningValue.startsWith("110") || warningValue.startsWith("111")) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,160 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
/**
* Class used for asynchronous revalidations to be used when the {@code stale-while-revalidate}
* directive is present
*/
class DefaultAsyncCacheRevalidator extends CacheRevalidatorBase {
private static final Future<Void> NOOP_FUTURE = new Future<Void>() {
@Override
public Void get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Void get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
};
static class InternalScheduledExecutor implements ScheduledExecutor {
private final ScheduledExecutor executor;
InternalScheduledExecutor(final ScheduledExecutor executor) {
this.executor = executor;
}
@Override
public Future<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
if (timeValue.toMillis() <= 0) {
command.run();
return NOOP_FUTURE;
} else {
return executor.schedule(command, timeValue);
}
}
@Override
public void shutdown() {
executor.shutdown();
}
@Override
public void awaitTermination(final Timeout timeout) throws InterruptedException {
executor.awaitTermination(timeout);
}
}
private final AsyncCachingExec cachingExec;
private final CacheKeyGenerator cacheKeyGenerator;
/**
* Create DefaultCacheRevalidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledExecutor}.
*/
public DefaultAsyncCacheRevalidator(
final ScheduledExecutor scheduledExecutor,
final SchedulingStrategy schedulingStrategy,
final AsyncCachingExec cachingExec) {
super(new InternalScheduledExecutor(scheduledExecutor), schedulingStrategy);
this.cachingExec = cachingExec;
this.cacheKeyGenerator = CacheKeyGenerator.INSTANCE;
}
/**
* Create CacheValidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledThreadPoolExecutor}.
*/
public DefaultAsyncCacheRevalidator(
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
final SchedulingStrategy schedulingStrategy,
final AsyncCachingExec cachingExec) {
this(wrap(scheduledThreadPoolExecutor), schedulingStrategy, cachingExec);
}
/**
* Schedules an asynchronous re-validation
*/
public void revalidateCacheEntry(
final HttpHost target,
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final HttpCacheEntry entry) {
final String cacheKey = cacheKeyGenerator.generateVariantURI(target, request, entry);
scheduleRevalidation(cacheKey, new Runnable() {
@Override
public void run() {
cachingExec.revalidateCacheEntry(target, request, entityProducer, scope, chain, asyncExecCallback, entry);
}
});
}
}

View File

@ -0,0 +1,113 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpStatus;
/**
* Class used for asynchronous revalidations to be used when
* the {@code stale-while-revalidate} directive is present
*/
class DefaultCacheRevalidator extends CacheRevalidatorBase {
private final CachingExec cachingExec;
private final CacheKeyGenerator cacheKeyGenerator;
/**
* Create DefaultCacheRevalidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledExecutor}.
*/
public DefaultCacheRevalidator(
final CacheRevalidatorBase.ScheduledExecutor scheduledExecutor,
final SchedulingStrategy schedulingStrategy,
final CachingExec cachingExec) {
super(scheduledExecutor, schedulingStrategy);
this.cachingExec = cachingExec;
this.cacheKeyGenerator = CacheKeyGenerator.INSTANCE;
}
/**
* Create CacheValidator which will make ache revalidation requests
* using the supplied {@link SchedulingStrategy} and {@link ScheduledThreadPoolExecutor}.
*/
public DefaultCacheRevalidator(
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
final SchedulingStrategy schedulingStrategy,
final CachingExec cachingExec) {
this(wrap(scheduledThreadPoolExecutor), schedulingStrategy, cachingExec);
}
/**
* Schedules an asynchronous re-validation
*/
public void revalidateCacheEntry(
final HttpHost target,
final ClassicHttpRequest request,
final ExecChain.Scope scope,
final ExecChain chain,
final HttpCacheEntry entry) {
final String cacheKey = cacheKeyGenerator.generateVariantURI(target, request, entry);
scheduleRevalidation(cacheKey, new Runnable() {
@Override
public void run() {
try {
try (ClassicHttpResponse httpResponse = cachingExec.revalidateCacheEntry(target, request, scope, chain, entry)) {
if (httpResponse.getCode() < HttpStatus.SC_SERVER_ERROR && !isStale(httpResponse)) {
jobSuccessful(cacheKey);
} else {
jobFailed(cacheKey);
}
}
} catch (final IOException ioe) {
jobFailed(cacheKey);
log.debug("Asynchronous revalidation failed due to I/O error", ioe);
} catch (final HttpException pe) {
jobFailed(cacheKey);
log.error("HTTP protocol exception during asynchronous revalidation", pe);
} catch (final RuntimeException re) {
jobFailed(cacheKey);
log.error("Unexpected runtime exception thrown during asynchronous revalidation" + re);
}
}
});
}
}

View File

@ -1,193 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.classic.ExecRuntime;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings({"boxing","static-access"}) // test code
public class TestAsynchronousValidationRequest {
private AsynchronousValidator mockParent;
private CachingExec mockClient;
private HttpHost host;
private HttpRoute route;
private ClassicHttpRequest request;
private HttpClientContext context;
private ExecChain.Scope scope;
private ExecChain execChain;
private ExecRuntime mockEndpoint;
private HttpCacheEntry mockCacheEntry;
private ClassicHttpResponse mockResponse;
@Before
public void setUp() {
mockParent = mock(AsynchronousValidator.class);
mockClient = mock(CachingExec.class);
host = new HttpHost("foo.example.com", 80);
route = new HttpRoute(host);
request = new HttpGet("/");
context = HttpClientContext.create();
mockEndpoint = mock(ExecRuntime.class);
execChain = mock(ExecChain.class);
mockCacheEntry = mock(HttpCacheEntry.class);
mockResponse = mock(ClassicHttpResponse.class);
scope = new ExecChain.Scope("test", route, request, mockEndpoint, context);
}
@Test
public void testRunCallsCachingClientAndRemovesIdentifier() throws Exception {
final String identifier = "foo";
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry)).thenReturn(mockResponse);
when(mockResponse.getCode()).thenReturn(200);
impl.run();
verify(mockClient).revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobSuccessful(identifier);
}
@Test
public void testRunReportsJobFailedForServerError() throws Exception {
final String identifier = "foo";
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry)).thenReturn(mockResponse);
when(mockResponse.getCode()).thenReturn(200);
impl.run();
verify(mockClient).revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobSuccessful(identifier);
}
@Test
public void testRunReportsJobFailedForStaleResponse() throws Exception {
final String identifier = "foo";
final Header[] warning = new Header[] {new BasicHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"")};
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry))
.thenReturn(mockResponse);
when(mockResponse.getCode()).thenReturn(200);
when(mockResponse.getHeaders(HeaderConstants.WARNING)).thenReturn(warning);
impl.run();
verify(mockClient).revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry);
verify(mockResponse).getHeaders(HeaderConstants.WARNING);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobFailed(identifier);
}
@Test
public void testRunGracefullyHandlesProtocolException() throws Exception {
final String identifier = "foo";
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry))
.thenThrow(new ProtocolException());
impl.run();
verify(mockClient).revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobFailed(identifier);
}
@Test
public void testRunGracefullyHandlesIOException() throws Exception {
final String identifier = "foo";
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry))
.thenThrow(new IOException());
impl.run();
verify(mockClient).revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobFailed(identifier);
}
@Test
public void testRunGracefullyHandlesRuntimeException() throws Exception {
final String identifier = "foo";
final AsynchronousValidationRequest impl = new AsynchronousValidationRequest(
mockParent, mockClient, host, request, scope, execChain, mockCacheEntry,
identifier, 0);
when(mockClient.revalidateCacheEntry(host, request, scope, execChain, mockCacheEntry))
.thenThrow(new RuntimeException());
impl.run();
verify(mockClient).revalidateCacheEntry(
host, request, scope, execChain, mockCacheEntry);
verify(mockParent).markComplete(identifier);
verify(mockParent).jobFailed(identifier);
}
}

View File

@ -1,196 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.classic.ExecRuntime;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicHeaderIterator;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TestAsynchronousValidator {
@Mock
private CachingExec mockClient;
@Mock
private ExecChain mockExecChain;
@Mock
private ExecRuntime mockEndpoint;
@Mock
private HttpCacheEntry mockCacheEntry;
@Mock
private SchedulingStrategy mockSchedulingStrategy;
@Mock
ScheduledExecutorService mockExecutorService;
private HttpHost host;
private HttpRoute route;
private ClassicHttpRequest request;
private HttpClientContext context;
private ExecChain.Scope scope;
private AsynchronousValidator impl;
@Before
public void setUp() {
host = new HttpHost("foo.example.com", 80);
route = new HttpRoute(host);
request = new HttpGet("/");
context = HttpClientContext.create();
scope = new ExecChain.Scope("test", route, request, mockEndpoint, context);
impl = new AsynchronousValidator(mockExecutorService, mockSchedulingStrategy);
}
@Test
public void testRevalidateCacheEntrySchedulesExecutionAndPopulatesIdentifier() {
when(mockCacheEntry.hasVariants()).thenReturn(false);
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(1));
impl.revalidateCacheEntry(mockClient, host, request, scope, mockExecChain, mockCacheEntry);
verify(mockCacheEntry).hasVariants();
verify(mockSchedulingStrategy).schedule(0);
verify(mockExecutorService).schedule(Mockito.<Runnable>any(), Mockito.eq(1L), Mockito.eq(TimeUnit.SECONDS));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
}
@Test
public void testMarkCompleteRemovesIdentifier() {
when(mockCacheEntry.hasVariants()).thenReturn(false);
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(3));
impl.revalidateCacheEntry(mockClient, host, request, scope, mockExecChain, mockCacheEntry);
verify(mockCacheEntry).hasVariants();
verify(mockSchedulingStrategy).schedule(0);
verify(mockExecutorService).schedule(Mockito.<Runnable>any(), Mockito.eq(3L), Mockito.eq(TimeUnit.SECONDS));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
final String cacheKey = CacheKeyGenerator.INSTANCE.generateVariantURI(host, request, mockCacheEntry);
Assert.assertTrue(impl.getScheduledIdentifiers().contains(cacheKey));
impl.markComplete(cacheKey);
Assert.assertEquals(0, impl.getScheduledIdentifiers().size());
}
@Test
public void testRevalidateCacheEntryDoesNotPopulateIdentifierOnRejectedExecutionException() {
when(mockCacheEntry.hasVariants()).thenReturn(false);
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(2));
doThrow(new RejectedExecutionException()).when(mockExecutorService).schedule(Mockito.<Runnable>any(), Mockito.anyLong(), Mockito.<TimeUnit>any());
impl.revalidateCacheEntry(mockClient, host, request, scope, mockExecChain, mockCacheEntry);
verify(mockCacheEntry).hasVariants();
Assert.assertEquals(0, impl.getScheduledIdentifiers().size());
verify(mockExecutorService).schedule(Mockito.<Runnable>any(), Mockito.eq(2L), Mockito.eq(TimeUnit.SECONDS));
}
@Test
public void testRevalidateCacheEntryProperlyCollapsesRequest() {
when(mockCacheEntry.hasVariants()).thenReturn(false);
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(2));
impl.revalidateCacheEntry(mockClient, host, request, scope, mockExecChain, mockCacheEntry);
impl.revalidateCacheEntry(mockClient, host, request, scope, mockExecChain, mockCacheEntry);
verify(mockCacheEntry, Mockito.times(2)).hasVariants();
verify(mockSchedulingStrategy).schedule(Mockito.anyInt());
verify(mockExecutorService).schedule(Mockito.<Runnable>any(), Mockito.eq(2L), Mockito.eq(TimeUnit.SECONDS));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
}
@Test
public void testVariantsBothRevalidated() {
final ClassicHttpRequest req1 = new HttpGet("/");
req1.addHeader(new BasicHeader("Accept-Encoding", "identity"));
final ClassicHttpRequest req2 = new HttpGet("/");
req2.addHeader(new BasicHeader("Accept-Encoding", "gzip"));
final Header[] variantHeaders = new Header[] {
new BasicHeader(HeaderConstants.VARY, "Accept-Encoding")
};
when(mockCacheEntry.hasVariants()).thenReturn(true);
when(mockCacheEntry.headerIterator(HeaderConstants.VARY)).thenReturn(
new BasicHeaderIterator(variantHeaders, HeaderConstants.VARY));
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(2));
impl.revalidateCacheEntry(mockClient, host, req1, new ExecChain.Scope("test", route, req1, mockEndpoint, context),
mockExecChain, mockCacheEntry);
impl.revalidateCacheEntry(mockClient, host, req2, new ExecChain.Scope("test", route, req2, mockEndpoint, context),
mockExecChain, mockCacheEntry);
verify(mockCacheEntry, Mockito.times(2)).hasVariants();
verify(mockCacheEntry, Mockito.times(2)).headerIterator(HeaderConstants.VARY);
verify(mockSchedulingStrategy, Mockito.times(2)).schedule(Mockito.anyInt());
verify(mockExecutorService, Mockito.times(2)).schedule(Mockito.<Runnable>any(), Mockito.eq(2L), Mockito.eq(TimeUnit.SECONDS));
Assert.assertEquals(2, impl.getScheduledIdentifiers().size());
}
@Test
public void testShutdown() throws Exception {
impl.close();
impl.awaitTermination(Timeout.ofMinutes(2));
verify(mockExecutorService).shutdown();
verify(mockExecutorService).awaitTermination(2L, TimeUnit.MINUTES);
}
}

View File

@ -0,0 +1,154 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.RejectedExecutionException;
import org.apache.hc.client5.http.cache.HeaderConstants;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TestCacheRevalidatorBase {
@Mock
private SchedulingStrategy mockSchedulingStrategy;
@Mock
private CacheRevalidatorBase.ScheduledExecutor mockScheduledExecutor;
@Mock
private Runnable mockOperation;
private CacheRevalidatorBase impl;
@Before
public void setUp() {
impl = new CacheRevalidatorBase(mockScheduledExecutor, mockSchedulingStrategy);
}
@Test
public void testRevalidateCacheEntrySchedulesExecutionAndPopulatesIdentifier() {
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(1));
final String cacheKey = "blah";
impl.scheduleRevalidation(cacheKey, mockOperation);
verify(mockSchedulingStrategy).schedule(0);
verify(mockScheduledExecutor).schedule(Mockito.same(mockOperation), Mockito.eq(TimeValue.ofSeconds(1)));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
}
@Test
public void testMarkCompleteRemovesIdentifier() {
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(3));
final String cacheKey = "blah";
impl.scheduleRevalidation(cacheKey, mockOperation);
verify(mockSchedulingStrategy).schedule(0);
verify(mockScheduledExecutor).schedule(Mockito.<Runnable>any(), Mockito.eq(TimeValue.ofSeconds(3)));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
Assert.assertTrue(impl.getScheduledIdentifiers().contains(cacheKey));
impl.jobSuccessful(cacheKey);
Assert.assertEquals(0, impl.getScheduledIdentifiers().size());
}
@Test
public void testRevalidateCacheEntryDoesNotPopulateIdentifierOnRejectedExecutionException() {
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(2));
doThrow(new RejectedExecutionException()).when(mockScheduledExecutor).schedule(Mockito.<Runnable>any(), Mockito.<TimeValue>any());
final String cacheKey = "blah";
impl.scheduleRevalidation(cacheKey, mockOperation);
Assert.assertEquals(0, impl.getScheduledIdentifiers().size());
verify(mockScheduledExecutor).schedule(Mockito.<Runnable>any(), Mockito.eq(TimeValue.ofSeconds(2)));
}
@Test
public void testRevalidateCacheEntryProperlyCollapsesRequest() {
when(mockSchedulingStrategy.schedule(Mockito.anyInt())).thenReturn(TimeValue.ofSeconds(2));
final String cacheKey = "blah";
impl.scheduleRevalidation(cacheKey, mockOperation);
impl.scheduleRevalidation(cacheKey, mockOperation);
impl.scheduleRevalidation(cacheKey, mockOperation);
verify(mockSchedulingStrategy).schedule(Mockito.anyInt());
verify(mockScheduledExecutor).schedule(Mockito.<Runnable>any(), Mockito.eq(TimeValue.ofSeconds(2)));
Assert.assertEquals(1, impl.getScheduledIdentifiers().size());
}
@Test
public void testStaleResponse() {
final HttpResponse response1 = new BasicHttpResponse(HttpStatus.SC_OK);
response1.addHeader(HeaderConstants.WARNING, "110 localhost \"Response is stale\"");
Assert.assertThat(impl.isStale(response1), CoreMatchers.equalTo(true));
final HttpResponse response2 = new BasicHttpResponse(HttpStatus.SC_OK);
response2.addHeader(HeaderConstants.WARNING, "111 localhost \"Revalidation failed\"");
Assert.assertThat(impl.isStale(response2), CoreMatchers.equalTo(true));
final HttpResponse response3 = new BasicHttpResponse(HttpStatus.SC_OK);
response3.addHeader(HeaderConstants.WARNING, "xxx localhost \"Huh?\"");
Assert.assertThat(impl.isStale(response3), CoreMatchers.equalTo(false));
final HttpResponse response4 = new BasicHttpResponse(HttpStatus.SC_OK);
Assert.assertThat(impl.isStale(response4), CoreMatchers.equalTo(false));
}
@Test
public void testShutdown() throws Exception {
impl.close();
impl.awaitTermination(Timeout.ofMinutes(2));
verify(mockScheduledExecutor).shutdown();
verify(mockScheduledExecutor).awaitTermination(Timeout.ofMinutes(2));
}
}