Replaced FailureCache by generic ConcurrentCountMap

This commit is contained in:
Oleg Kalnichevski 2018-01-05 15:09:05 +01:00
parent 0561bacc66
commit 79b76030fd
6 changed files with 101 additions and 315 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.impl.schedule.ImmediateSchedulingStrategy;
import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.HttpHost;
@ -57,7 +58,7 @@ class AsynchronousValidator implements Closeable {
private final SchedulingStrategy schedulingStrategy;
private final Set<String> queued;
private final CacheKeyGenerator cacheKeyGenerator;
private final FailureCache failureCache;
private final ConcurrentCountMap<String> failureCache;
private final Logger log = LogManager.getLogger(getClass());
@ -87,7 +88,7 @@ class AsynchronousValidator implements Closeable {
this.schedulingStrategy = schedulingStrategy;
this.queued = new HashSet<>();
this.cacheKeyGenerator = CacheKeyGenerator.INSTANCE;
this.failureCache = new DefaultFailureCache();
this.failureCache = new ConcurrentCountMap<>();
}
/**
@ -104,7 +105,7 @@ class AsynchronousValidator implements Closeable {
final String cacheKey = cacheKeyGenerator.generateVariantURI(target, request, entry);
if (!queued.contains(cacheKey)) {
final int consecutiveFailedAttempts = failureCache.getErrorCount(cacheKey);
final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
final AsynchronousValidationRequest revalidationRequest =
new AsynchronousValidationRequest(
this, cachingExec, target, request, scope, chain, entry, cacheKey, consecutiveFailedAttempts);
@ -147,7 +148,7 @@ class AsynchronousValidator implements Closeable {
* @param identifier the revalidation job's unique identifier
*/
void jobSuccessful(final String identifier) {
failureCache.resetErrorCount(identifier);
failureCache.resetCount(identifier);
}
/**
@ -157,10 +158,11 @@ class AsynchronousValidator implements Closeable {
* @param identifier the revalidation job's unique identifier
*/
void jobFailed(final String identifier) {
failureCache.increaseErrorCount(identifier);
failureCache.increaseCount(identifier);
}
Set<String> getScheduledIdentifiers() {
return Collections.unmodifiableSet(queued);
}
}

View File

@ -1,147 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
/**
* Implements a bounded failure cache. The oldest entries are discarded when
* the maximum size is exceeded.
*
* @since 4.3
*/
@Contract(threading = ThreadingBehavior.SAFE)
public class DefaultFailureCache implements FailureCache {
static final int DEFAULT_MAX_SIZE = 1000;
static final int MAX_UPDATE_TRIES = 10;
private final int maxSize;
private final ConcurrentMap<String, FailureCacheValue> storage;
/**
* Create a new failure cache with the maximum size of
* {@link #DEFAULT_MAX_SIZE}.
*/
public DefaultFailureCache() {
this(DEFAULT_MAX_SIZE);
}
/**
* Creates a new failure cache with the specified maximum size.
* @param maxSize the maximum number of entries the cache should store
*/
public DefaultFailureCache(final int maxSize) {
this.maxSize = maxSize;
this.storage = new ConcurrentHashMap<>();
}
@Override
public int getErrorCount(final String identifier) {
if (identifier == null) {
throw new IllegalArgumentException("identifier may not be null");
}
final FailureCacheValue storedErrorCode = storage.get(identifier);
return storedErrorCode != null ? storedErrorCode.getErrorCount() : 0;
}
@Override
public void resetErrorCount(final String identifier) {
if (identifier == null) {
throw new IllegalArgumentException("identifier may not be null");
}
storage.remove(identifier);
}
@Override
public void increaseErrorCount(final String identifier) {
if (identifier == null) {
throw new IllegalArgumentException("identifier may not be null");
}
updateValue(identifier);
removeOldestEntryIfMapSizeExceeded();
}
private void updateValue(final String identifier) {
/**
* Due to concurrency it is possible that someone else is modifying an
* entry before we could write back our updated value. So we keep
* trying until it is our turn.
*
* In case there is a lot of contention on that identifier, a thread
* might starve. Thus it gives up after a certain number of failed
* processChallenge tries.
*/
for (int i = 0; i < MAX_UPDATE_TRIES; i++) {
final FailureCacheValue oldValue = storage.get(identifier);
if (oldValue == null) {
final FailureCacheValue newValue = new FailureCacheValue(identifier, 1);
if (storage.putIfAbsent(identifier, newValue) == null) {
return;
}
}
else {
final int errorCount = oldValue.getErrorCount();
if (errorCount == Integer.MAX_VALUE) {
return;
}
final FailureCacheValue newValue = new FailureCacheValue(identifier, errorCount + 1);
if (storage.replace(identifier, oldValue, newValue)) {
return;
}
}
}
}
private void removeOldestEntryIfMapSizeExceeded() {
if (storage.size() > maxSize) {
final FailureCacheValue valueWithOldestTimestamp = findValueWithOldestTimestamp();
if (valueWithOldestTimestamp != null) {
storage.remove(valueWithOldestTimestamp.getKey(), valueWithOldestTimestamp);
}
}
}
private FailureCacheValue findValueWithOldestTimestamp() {
long oldestTimestamp = Long.MAX_VALUE;
FailureCacheValue oldestValue = null;
for (final Map.Entry<String, FailureCacheValue> storageEntry : storage.entrySet()) {
final FailureCacheValue value = storageEntry.getValue();
final long creationTimeInNanos = value.getCreationTimeInNanos();
if (creationTimeInNanos < oldestTimestamp) {
oldestTimestamp = creationTimeInNanos;
oldestValue = storageEntry.getValue();
}
}
return oldestValue;
}
}

View File

@ -1,68 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
/**
* The error count with a creation timestamp and its associated key.
*
* @since 4.3
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class FailureCacheValue {
private final long creationTimeInNanos;
private final String key;
private final int errorCount;
public FailureCacheValue(final String key, final int errorCount) {
this.creationTimeInNanos = System.nanoTime();
this.key = key;
this.errorCount = errorCount;
}
public long getCreationTimeInNanos() {
return creationTimeInNanos;
}
public String getKey()
{
return key;
}
public int getErrorCount() {
return errorCount;
}
@Override
public String toString() {
return "[entry creationTimeInNanos=" + creationTimeInNanos + "; " +
"key=" + key + "; errorCount=" + errorCount + ']';
}
}

View File

@ -0,0 +1,75 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.schedule;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.util.Args;
@Contract(threading = ThreadingBehavior.SAFE)
public final class ConcurrentCountMap<T> {
private final ConcurrentMap<T, AtomicInteger> map;
public ConcurrentCountMap() {
this.map = new ConcurrentHashMap<>();
}
public int getCount(final T identifier) {
Args.notNull(identifier, "Identifier");
final AtomicInteger count = map.get(identifier);
return count != null ? count.get() : 0;
}
public void resetCount(final T identifier) {
Args.notNull(identifier, "Identifier");
map.remove(identifier);
}
public int increaseCount(final T identifier) {
Args.notNull(identifier, "Identifier");
final AtomicInteger count = get(identifier);
return count.incrementAndGet();
}
private AtomicInteger get(final T identifier) {
AtomicInteger entry = map.get(identifier);
if (entry == null) {
final AtomicInteger newEntry = new AtomicInteger();
entry = map.putIfAbsent(identifier, newEntry);
if (entry == null) {
entry = newEntry;
}
}
return entry;
}
}

View File

@ -1,69 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import org.junit.Assert;
import org.junit.Test;
public class TestDefaultFailureCache
{
private static final String IDENTIFIER = "some-identifier";
private FailureCache failureCache = new DefaultFailureCache();
@Test
public void testResetErrorCount() {
failureCache.increaseErrorCount(IDENTIFIER);
failureCache.resetErrorCount(IDENTIFIER);
final int errorCount = failureCache.getErrorCount(IDENTIFIER);
Assert.assertEquals(0, errorCount);
}
@Test
public void testIncrementErrorCount() {
failureCache.increaseErrorCount(IDENTIFIER);
failureCache.increaseErrorCount(IDENTIFIER);
failureCache.increaseErrorCount(IDENTIFIER);
final int errorCount = failureCache.getErrorCount(IDENTIFIER);
Assert.assertEquals(3, errorCount);
}
@Test
public void testMaxSize() {
failureCache = new DefaultFailureCache(3);
failureCache.increaseErrorCount("a");
failureCache.increaseErrorCount("b");
failureCache.increaseErrorCount("c");
failureCache.increaseErrorCount("d");
final int errorCount = failureCache.getErrorCount("a");
Assert.assertEquals(0, errorCount);
}
}

View File

@ -24,34 +24,27 @@
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
package org.apache.hc.client5.http.schedule;
/**
* Increase and reset the number of errors associated with a specific
* identifier.
*
* @since 4.3
*/
public interface FailureCache {
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
/**
* Get the current error count.
* @param identifier the identifier for which the error count is requested
* @return the currently known error count or zero if there is no record
*/
int getErrorCount(String identifier);
public class TestConcurrentCountMap
{
/**
* Reset the error count back to zero.
* @param identifier the identifier for which the error count should be
* reset
*/
void resetErrorCount(String identifier);
private static final String IDENTIFIER = "some-identifier";
private ConcurrentCountMap<String> map = new ConcurrentCountMap<>();
@Test
public void testBasics() {
map.increaseCount(IDENTIFIER);
map.increaseCount(IDENTIFIER);
Assert.assertThat(map.getCount(IDENTIFIER), CoreMatchers.equalTo(2));
map.resetCount(IDENTIFIER);
Assert.assertThat(map.getCount(IDENTIFIER), CoreMatchers.equalTo(0));
}
/**
* Increases the error count by one.
* @param identifier the identifier for which the error count should be
* increased
*/
void increaseErrorCount(String identifier);
}