HTTPCLIENT-1824, HTTPCLIENT-1868: Asynchronous HTTP cache storage API; Memcached backend implementation of async HTTP cache storage

This commit is contained in:
Oleg Kalnichevski 2017-12-20 10:55:32 +01:00
parent ebcb55d641
commit 002f40f9d3
9 changed files with 1245 additions and 18 deletions

View File

@ -0,0 +1,90 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.cache;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
/**
* {@literal HttpAsyncCacheStorage} represents an abstract HTTP cache
* storage backend that can then be plugged into the asynchronous
* (non-blocking ) request execution
* pipeline.
*
* @since 5.0
*/
public interface HttpAsyncCacheStorage {
Cancellable NOOP_CANCELLABLE = new Cancellable() {
@Override
public boolean cancel() {
return false;
}
};
/**
* Store a given cache entry under the given key.
* @param key where in the cache to store the entry
* @param entry cached response to store
* @param callback result callback
*/
Cancellable putEntry(
String key, HttpCacheEntry entry, FutureCallback<Boolean> callback);
/**
* Retrieves the cache entry stored under the given key
* or null if no entry exists under that key.
* @param key cache key
* @param callback result callback
* @return an {@link HttpCacheEntry} or {@code null} if no
* entry exists
*/
Cancellable getEntry(
String key, FutureCallback<HttpCacheEntry> callback);
/**
* Deletes/invalidates/removes any cache entries currently
* stored under the given key.
* @param key
* @param callback result callback
*/
Cancellable removeEntry(
String key, FutureCallback<Boolean> callback);
/**
* Atomically applies the given callback to processChallenge an existing cache
* entry under a given key.
* @param key indicates which entry to modify
* @param casOperation the CAS operation to perform.
* @param callback result callback
*/
Cancellable updateEntry(
String key, HttpCacheCASOperation casOperation, FutureCallback<Boolean> callback);
}

View File

@ -0,0 +1,98 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.cache;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.util.Args;
/**
* {@link HttpAsyncCacheStorage} implementation that emulates asynchronous
* behavior using an instance of classic {@link HttpCacheStorage}.
*
* @since 5.0
*/
public final class HttpAsyncCacheStorageAdaptor implements HttpAsyncCacheStorage {
private final HttpCacheStorage cacheStorage;
public HttpAsyncCacheStorageAdaptor(final HttpCacheStorage cacheStorage) {
this.cacheStorage = Args.notNull(cacheStorage, "Cache strorage");
}
public Cancellable putEntry(final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
Args.notEmpty(key, "Key");
Args.notNull(entry, "Cache ehtry");
Args.notNull(callback, "Callback");
try {
cacheStorage.putEntry(key, entry);
callback.completed(Boolean.TRUE);
} catch (final Exception ex) {
callback.failed(ex);
}
return NOOP_CANCELLABLE;
}
public Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
Args.notEmpty(key, "Key");
Args.notNull(callback, "Callback");
try {
final HttpCacheEntry entry = cacheStorage.getEntry(key);
callback.completed(entry);
} catch (final Exception ex) {
callback.failed(ex);
}
return NOOP_CANCELLABLE;
}
public Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
Args.notEmpty(key, "Key");
Args.notNull(callback, "Callback");
try {
cacheStorage.removeEntry(key);
callback.completed(Boolean.TRUE);
} catch (final Exception ex) {
callback.failed(ex);
}
return NOOP_CANCELLABLE;
}
public Cancellable updateEntry(
final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
Args.notEmpty(key, "Key");
Args.notNull(casOperation, "CAS operation");
Args.notNull(callback, "Callback");
try {
cacheStorage.updateEntry(key, casOperation);
callback.completed(Boolean.TRUE);
} catch (final Exception ex) {
callback.failed(ex);
}
return NOOP_CANCELLABLE;
}
}

View File

@ -27,9 +27,9 @@
package org.apache.hc.client5.http.cache; package org.apache.hc.client5.http.cache;
/** /**
* New storage backends should implement this {@link HttpCacheStorage} * {@literal HttpCacheStorage} represents an abstract HTTP cache
* interface. They can then be plugged into the existing caching * storage backend that can then be plugged into the classic
* {@link org.apache.hc.client5.http.classic.HttpClient} implementation. * (blocking) request execution pipeline.
* *
* @since 4.1 * @since 4.1
*/ */

View File

@ -0,0 +1,46 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
/**
* Abstract cache backend for serialized binary objects capable of CAS (compare-and-swap) updates.
*
* @since 5.0
*/
public abstract class AbstractBinaryAsyncCacheStorage<CAS> extends AbstractSerializingAsyncCacheStorage<byte[], CAS> {
public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<byte[]> serializer) {
super(maxUpdateRetries, serializer);
}
public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries) {
super(maxUpdateRetries, ByteArrayCacheEntrySerializer.INSTANCE);
}
}

View File

@ -0,0 +1,228 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.util.Args;
/**
* Abstract cache backend for serialized objects capable of CAS (compare-and-swap) updates.
*
* @since 5.0
*/
public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements HttpAsyncCacheStorage {
private final int maxUpdateRetries;
private final HttpCacheEntrySerializer<T> serializer;
public AbstractSerializingAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer<T> serializer) {
this.maxUpdateRetries = Args.notNegative(maxUpdateRetries, "Max retries");
this.serializer = Args.notNull(serializer, "Cache entry serializer");
}
protected abstract String digestToStorageKey(String key);
protected abstract T getStorageObject(CAS cas) throws ResourceIOException;
protected abstract Cancellable store(String storageKey, T storageObject, FutureCallback<Boolean> callback);
protected abstract Cancellable restore(String storageKey, FutureCallback<T> callback);
protected abstract Cancellable getForUpdateCAS(String storageKey, FutureCallback<CAS> callback);
protected abstract Cancellable updateCAS(String storageKey, CAS cas, T storageObject, FutureCallback<Boolean> callback);
protected abstract Cancellable delete(String storageKey, FutureCallback<Boolean> callback);
@Override
public final Cancellable putEntry(
final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
Args.notNull(key, "Storage key");
Args.notNull(callback, "Callback");
try {
final String storageKey = digestToStorageKey(key);
final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, entry));
return store(storageKey, storageObject, callback);
} catch (final Exception ex) {
callback.failed(ex);
return NOOP_CANCELLABLE;
}
}
@Override
public final Cancellable getEntry(final String key, final FutureCallback<HttpCacheEntry> callback) {
Args.notNull(key, "Storage key");
Args.notNull(callback, "Callback");
try {
final String storageKey = digestToStorageKey(key);
return restore(storageKey, new FutureCallback<T>() {
@Override
public void completed(final T storageObject) {
try {
if (storageObject != null) {
final HttpCacheStorageEntry entry = serializer.deserialize(storageObject);
if (key.equals(entry.getKey())) {
callback.completed(entry.getContent());
} else {
callback.completed(null);
}
} else {
callback.completed(null);
}
} catch (final Exception ex) {
callback.failed(ex);
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
} catch (final Exception ex) {
callback.failed(ex);
return NOOP_CANCELLABLE;
}
}
@Override
public final Cancellable removeEntry(final String key, final FutureCallback<Boolean> callback) {
Args.notNull(key, "Storage key");
Args.notNull(callback, "Callback");
try {
final String storageKey = digestToStorageKey(key);
return delete(storageKey, callback);
} catch (final Exception ex) {
callback.failed(ex);
return NOOP_CANCELLABLE;
}
}
@Override
public final Cancellable updateEntry(
final String key, final HttpCacheCASOperation casOperation, final FutureCallback<Boolean> callback) {
Args.notNull(key, "Storage key");
Args.notNull(casOperation, "CAS operation");
Args.notNull(callback, "Callback");
final ComplexCancellable complexCancellable = new ComplexCancellable();
final AtomicInteger count = new AtomicInteger(0);
atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
return complexCancellable;
}
private void atemmptUpdateEntry(
final String key,
final HttpCacheCASOperation casOperation,
final ComplexCancellable complexCancellable,
final AtomicInteger count,
final FutureCallback<Boolean> callback) {
try {
final String storageKey = digestToStorageKey(key);
complexCancellable.setDependency(getForUpdateCAS(storageKey, new FutureCallback<CAS>() {
@Override
public void completed(final CAS cas) {
try {
HttpCacheStorageEntry storageEntry = cas != null ? serializer.deserialize(getStorageObject(cas)) : null;
if (storageEntry != null && !key.equals(storageEntry.getKey())) {
storageEntry = null;
}
final HttpCacheEntry existingEntry = storageEntry != null ? storageEntry.getContent() : null;
final HttpCacheEntry updatedEntry = casOperation.execute(existingEntry);
if (existingEntry == null) {
putEntry(key, updatedEntry, callback);
} else {
final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, updatedEntry));
complexCancellable.setDependency(updateCAS(storageKey, cas, storageObject, new FutureCallback<Boolean>() {
@Override
public void completed(final Boolean result) {
if (result) {
callback.completed(result);
} else {
if (!complexCancellable.isCancelled()) {
final int numRetries = count.incrementAndGet();
if (numRetries >= maxUpdateRetries) {
callback.failed(new HttpCacheUpdateException("Cache update failed after " + numRetries + " retries"));
} else {
atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback);
}
}
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
}));
}
} catch (final Exception ex) {
callback.failed(ex);
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
}));
} catch (final Exception ex) {
callback.failed(ex);
}
}
}

View File

@ -0,0 +1,74 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.util.Args;
/**
* TODO: replace with ComplexCancellable from HttpCore 5.0b2
*/
final class ComplexCancellable implements Cancellable {
private final AtomicReference<Cancellable> dependencyRef;
private final AtomicBoolean cancelled;
public ComplexCancellable() {
this.dependencyRef = new AtomicReference<>(null);
this.cancelled = new AtomicBoolean(false);
}
public boolean isCancelled() {
return cancelled.get();
}
public void setDependency(final Cancellable dependency) {
Args.notNull(dependency, "dependency");
if (!cancelled.get()) {
dependencyRef.set(dependency);
} else {
dependency.cancel();
}
}
@Override
public boolean cancel() {
if (cancelled.compareAndSet(false, true)) {
final Cancellable dependency = dependencyRef.getAndSet(null);
if (dependency != null) {
dependency.cancel();
}
return true;
} else {
return false;
}
}
}

View File

@ -0,0 +1,250 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache.memcached;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage;
import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
import org.apache.hc.client5.http.impl.cache.CacheConfig;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.util.Args;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.GetCompletionListener;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
/**
* <p>
* This class is a storage backend that uses an external <i>memcached</i>
* for storing cached origin responses. This storage option provides a
* couple of interesting advantages over the default in-memory storage
* backend:
* </p>
* <ol>
* <li>in-memory cached objects can survive an application restart since
* they are held in a separate process</li>
* <li>it becomes possible for several cooperating applications to share
* a large <i>memcached</i> farm together</li>
* </ol>
* <p>
* Note that in a shared memcached pool setting you may wish to make use
* of the Ketama consistent hashing algorithm to reduce the number of
* cache misses that might result if one of the memcached cluster members
* fails (see the <a href="http://dustin.github.com/java-memcached-client/apidocs/net/spy/memcached/KetamaConnectionFactory.html">
* KetamaConnectionFactory</a>).
* </p>
* <p>
* Because memcached places limits on the size of its keys, we need to
* introduce a key hashing scheme to map the annotated URLs the higher-level
* caching HTTP client wants to use as keys onto ones that are suitable
* for use with memcached. Please see {@link KeyHashingScheme} if you would
* like to use something other than the provided {@link SHA256KeyHashingScheme}.
* </p>
*
* <p>
* Please refer to the <a href="http://code.google.com/p/memcached/wiki/NewStart">
* memcached documentation</a> and in particular to the documentation for
* the <a href="http://code.google.com/p/spymemcached/">spymemcached
* documentation</a> for details about how to set up and configure memcached
* and the Java client used here, respectively.
* </p>
*
* @since 5.0
*/
public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage<CASValue<Object>> {
private final MemcachedClient client;
private final KeyHashingScheme keyHashingScheme;
/**
* Create a storage backend talking to a <i>memcached</i> instance
* listening on the specified host and port. This is useful if you
* just have a single local memcached instance running on the same
* machine as your application, for example.
* @param address where the <i>memcached</i> daemon is running
* @throws IOException in case of an error
*/
public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException {
this(new MemcachedClient(address));
}
/**
* Create a storage backend using the pre-configured given
* <i>memcached</i> client.
* @param cache client to use for communicating with <i>memcached</i>
*/
public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) {
this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
}
/**
* Create a storage backend using the given <i>memcached</i> client and
* applying the given cache configuration, serialization, and hashing
* mechanisms.
* @param client how to talk to <i>memcached</i>
* @param config apply HTTP cache-related options
* @param serializer alternative serialization mechanism
* @param keyHashingScheme how to map higher-level logical "storage keys"
* onto "cache keys" suitable for use with memcached
*/
public MemcachedHttpAsyncCacheStorage(
final MemcachedClient client,
final CacheConfig config,
final HttpCacheEntrySerializer<byte[]> serializer,
final KeyHashingScheme keyHashingScheme) {
super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(),
serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE);
this.client = Args.notNull(client, "Memcached client");
this.keyHashingScheme = keyHashingScheme;
}
@Override
protected String digestToStorageKey(final String key) {
return keyHashingScheme.hash(key);
}
private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
if (storageObject == null) {
return null;
}
if (storageObject instanceof byte[]) {
return (byte[]) storageObject;
} else {
throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass());
}
}
@Override
protected byte[] getStorageObject(final CASValue<Object> casValue) throws ResourceIOException {
return castAsByteArray(casValue.getValue());
}
private <T> Cancellable operation(final OperationFuture<T> operationFuture, final FutureCallback<T> callback) {
operationFuture.addListener(new OperationCompletionListener() {
@Override
public void onComplete(final OperationFuture<?> future) throws Exception {
try {
callback.completed(operationFuture.get());
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof Exception) {
callback.failed((Exception) ex.getCause());
} else {
callback.failed(ex);
}
}
}
});
return new Cancellable() {
@Override
public boolean cancel() {
return operationFuture.cancel();
}
};
}
@Override
protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback<Boolean> callback) {
return operation(client.set(storageKey, 0, storageObject), callback);
}
@Override
protected Cancellable restore(final String storageKey, final FutureCallback<byte[]> callback) {
final GetFuture<Object> getFuture = client.asyncGet(storageKey);
getFuture.addListener(new GetCompletionListener() {
@Override
public void onComplete(final GetFuture<?> future) throws Exception {
try {
callback.completed(castAsByteArray(getFuture.get()));
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof Exception) {
callback.failed((Exception) ex.getCause());
} else {
callback.failed(ex);
}
}
}
});
return new Cancellable() {
@Override
public boolean cancel() {
return getFuture.cancel(true);
}
};
}
@Override
protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback<CASValue<Object>> callback) {
return operation(client.asyncGets(storageKey), callback);
}
@Override
protected Cancellable updateCAS(
final String storageKey, final CASValue<Object> casValue, final byte[] storageObject, final FutureCallback<Boolean> callback) {
return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback<CASResponse>() {
@Override
public void completed(final CASResponse result) {
callback.completed(result == CASResponse.OK);
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
}
@Override
protected Cancellable delete(final String storageKey, final FutureCallback<Boolean> callback) {
return operation(client.delete(storageKey), callback);
}
}

View File

@ -32,14 +32,13 @@ import java.net.InetSocketAddress;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer; import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.ResourceIOException; import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.client5.http.impl.cache.AbstractBinaryCacheStorage; import org.apache.hc.client5.http.impl.cache.AbstractBinaryCacheStorage;
import org.apache.hc.client5.http.impl.cache.CacheConfig;
import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer; import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer;
import org.apache.hc.client5.http.impl.cache.CacheConfig;
import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Args;
import net.spy.memcached.CASResponse; import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue; import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.OperationTimeoutException; import net.spy.memcached.OperationTimeoutException;
/** /**
@ -82,7 +81,7 @@ import net.spy.memcached.OperationTimeoutException;
*/ */
public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASValue<Object>> { public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASValue<Object>> {
private final MemcachedClientIF client; private final MemcachedClient client;
private final KeyHashingScheme keyHashingScheme; private final KeyHashingScheme keyHashingScheme;
/** /**
@ -102,7 +101,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
* <i>memcached</i> client. * <i>memcached</i> client.
* @param cache client to use for communicating with <i>memcached</i> * @param cache client to use for communicating with <i>memcached</i>
*/ */
public MemcachedHttpCacheStorage(final MemcachedClientIF cache) { public MemcachedHttpCacheStorage(final MemcachedClient cache) {
this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE); this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE);
} }
@ -117,7 +116,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
* onto "cache keys" suitable for use with memcached * onto "cache keys" suitable for use with memcached
*/ */
public MemcachedHttpCacheStorage( public MemcachedHttpCacheStorage(
final MemcachedClientIF client, final MemcachedClient client,
final CacheConfig config, final CacheConfig config,
final HttpCacheEntrySerializer<byte[]> serializer, final HttpCacheEntrySerializer<byte[]> serializer,
final KeyHashingScheme keyHashingScheme) { final KeyHashingScheme keyHashingScheme) {
@ -134,11 +133,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
@Override @Override
protected void store(final String storageKey, final byte[] storageObject) throws ResourceIOException { protected void store(final String storageKey, final byte[] storageObject) throws ResourceIOException {
try {
client.set(storageKey, 0, storageObject); client.set(storageKey, 0, storageObject);
} catch (final OperationTimeoutException ex) {
throw new MemcachedOperationTimeoutException(ex);
}
} }
private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException { private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException {
@ -184,11 +179,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
@Override @Override
protected void delete(final String storageKey) throws ResourceIOException { protected void delete(final String storageKey) throws ResourceIOException {
try {
client.delete(storageKey); client.delete(storageKey);
} catch (final OperationTimeoutException ex) {
throw new MemcachedOperationTimeoutException(ex);
}
} }
} }

View File

@ -0,0 +1,450 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class TestAbstractSerializingAsyncCacheStorage {
@Mock
private Cancellable cancellable;
@Mock
private FutureCallback<Boolean> operationCallback;
@Mock
private FutureCallback<HttpCacheEntry> cacheEntryCallback;
private AbstractBinaryAsyncCacheStorage<String> impl;
public static byte[] serialize(final String key, final HttpCacheEntry value) throws ResourceIOException {
return ByteArrayCacheEntrySerializer.INSTANCE.serialize(new HttpCacheStorageEntry(key, value));
}
@Before
@SuppressWarnings("unchecked")
public void setUp() {
impl = Mockito.mock(AbstractBinaryAsyncCacheStorage.class,
Mockito.withSettings().defaultAnswer(Answers.CALLS_REAL_METHODS).useConstructor(3));
}
@Test
public void testCachePut() throws Exception {
final String key = "foo";
final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.store(
Mockito.eq("bar"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(2);
callback.completed(true);
return cancellable;
}
});
impl.putEntry(key, value, operationCallback);
final ArgumentCaptor<byte[]> argumentCaptor = ArgumentCaptor.forClass(byte[].class);
Mockito.verify(impl).store(Mockito.eq("bar"), argumentCaptor.capture(), Mockito.<FutureCallback<Boolean>>any());
Assert.assertArrayEquals(serialize(key, value), argumentCaptor.getValue());
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testCacheGetNullEntry() throws Exception {
final String key = "foo";
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<byte[]> callback = invocation.getArgument(1);
callback.completed(null);
return cancellable;
}
});
impl.getEntry(key, cacheEntryCallback);
final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue());
Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
}
@Test
public void testCacheGet() throws Exception {
final String key = "foo";
final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<byte[]> callback = invocation.getArgument(1);
callback.completed(serialize(key, value));
return cancellable;
}
});
impl.getEntry(key, cacheEntryCallback);
final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
final HttpCacheEntry resultingEntry = argumentCaptor.getValue();
Assert.assertThat(resultingEntry, HttpCacheEntryMatcher.equivalent(value));
Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
}
@Test
public void testCacheGetKeyMismatch() throws Exception {
final String key = "foo";
final HttpCacheEntry value = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<byte[]> callback = invocation.getArgument(1);
callback.completed(serialize("not-foo", value));
return cancellable;
}
});
impl.getEntry(key, cacheEntryCallback);
final ArgumentCaptor<HttpCacheEntry> argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class);
Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture());
Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue());
Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.<FutureCallback<byte[]>>any());
}
@Test
public void testCacheRemove() throws Exception{
final String key = "foo";
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.delete(
Mockito.eq("bar"),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(1);
callback.completed(true);
return cancellable;
}
});
impl.removeEntry(key, operationCallback);
Mockito.verify(impl).delete("bar", operationCallback);
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testCacheUpdateNullEntry() throws Exception {
final String key = "foo";
final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<byte[]> callback = invocation.getArgument(1);
callback.completed(null);
return cancellable;
}
});
Mockito.when(impl.store(
Mockito.eq("bar"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(2);
callback.completed(true);
return cancellable;
}
});
impl.updateEntry(key, new HttpCacheCASOperation() {
@Override
public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
Assert.assertThat(existing, CoreMatchers.nullValue());
return updatedValue;
}
}, operationCallback);
Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testCacheCASUpdate() throws Exception {
final String key = "foo";
final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<String> callback = invocation.getArgument(1);
callback.completed("stuff");
return cancellable;
}
});
Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
Mockito.when(impl.updateCAS(
Mockito.eq("bar"),
Mockito.eq("stuff"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(3);
callback.completed(true);
return cancellable;
}
});
impl.updateEntry(key, new HttpCacheCASOperation() {
@Override
public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
return updatedValue;
}
}, operationCallback);
Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
Mockito.verify(impl).getStorageObject("stuff");
Mockito.verify(impl).updateCAS(Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testCacheCASUpdateKeyMismatch() throws Exception {
final String key = "foo";
final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<String> callback = invocation.getArgument(1);
callback.completed("stuff");
return cancellable;
}
});
Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize("not-foo", existingValue));
Mockito.when(impl.store(
Mockito.eq("bar"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(2);
callback.completed(true);
return cancellable;
}
});
impl.updateEntry(key, new HttpCacheCASOperation() {
@Override
public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
Assert.assertThat(existing, CoreMatchers.nullValue());
return updatedValue;
}
}, operationCallback);
Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
Mockito.verify(impl).getStorageObject("stuff");
Mockito.verify(impl, Mockito.never()).updateCAS(
Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testSingleCacheUpdateRetry() throws Exception {
final String key = "foo";
final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<String> callback = invocation.getArgument(1);
callback.completed("stuff");
return cancellable;
}
});
Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
final AtomicInteger count = new AtomicInteger(0);
Mockito.when(impl.updateCAS(
Mockito.eq("bar"),
Mockito.eq("stuff"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(3);
if (count.incrementAndGet() == 1) {
callback.completed(false);
} else {
callback.completed(true);
}
return cancellable;
}
});
impl.updateEntry(key, new HttpCacheCASOperation() {
@Override
public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
return updatedValue;
}
}, operationCallback);
Mockito.verify(impl, Mockito.times(2)).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
Mockito.verify(impl, Mockito.times(2)).getStorageObject("stuff");
Mockito.verify(impl, Mockito.times(2)).updateCAS(
Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(operationCallback).completed(Boolean.TRUE);
}
@Test
public void testCacheUpdateFail() throws Exception {
final String key = "foo";
final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry();
Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar");
Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any())).thenAnswer(
new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<String> callback = invocation.getArgument(1);
callback.completed("stuff");
return cancellable;
}
});
Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue));
final AtomicInteger count = new AtomicInteger(0);
Mockito.when(impl.updateCAS(
Mockito.eq("bar"),
Mockito.eq("stuff"),
Mockito.<byte[]>any(),
Mockito.<FutureCallback<Boolean>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final FutureCallback<Boolean> callback = invocation.getArgument(3);
if (count.incrementAndGet() <= 3) {
callback.completed(false);
} else {
callback.completed(true);
}
return cancellable;
}
});
impl.updateEntry(key, new HttpCacheCASOperation() {
@Override
public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException {
return updatedValue;
}
}, operationCallback);
Mockito.verify(impl, Mockito.times(3)).getForUpdateCAS(Mockito.eq("bar"), Mockito.<FutureCallback<String>>any());
Mockito.verify(impl, Mockito.times(3)).getStorageObject("stuff");
Mockito.verify(impl, Mockito.times(3)).updateCAS(
Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any(), Mockito.<FutureCallback<Boolean>>any());
Mockito.verify(operationCallback).failed(Mockito.<HttpCacheUpdateException>any());
}
}