diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java new file mode 100644 index 000000000..4ef0ce7e0 --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorage.java @@ -0,0 +1,90 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.cache; + +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureCallback; + +/** + * {@literal HttpAsyncCacheStorage} represents an abstract HTTP cache + * storage backend that can then be plugged into the asynchronous + * (non-blocking ) request execution + * pipeline. + * + * @since 5.0 + */ +public interface HttpAsyncCacheStorage { + + Cancellable NOOP_CANCELLABLE = new Cancellable() { + + @Override + public boolean cancel() { + return false; + } + + }; + + /** + * Store a given cache entry under the given key. + * @param key where in the cache to store the entry + * @param entry cached response to store + * @param callback result callback + */ + Cancellable putEntry( + String key, HttpCacheEntry entry, FutureCallback callback); + + /** + * Retrieves the cache entry stored under the given key + * or null if no entry exists under that key. + * @param key cache key + * @param callback result callback + * @return an {@link HttpCacheEntry} or {@code null} if no + * entry exists + */ + Cancellable getEntry( + String key, FutureCallback callback); + + /** + * Deletes/invalidates/removes any cache entries currently + * stored under the given key. + * @param key + * @param callback result callback + */ + Cancellable removeEntry( + String key, FutureCallback callback); + + /** + * Atomically applies the given callback to processChallenge an existing cache + * entry under a given key. + * @param key indicates which entry to modify + * @param casOperation the CAS operation to perform. + * @param callback result callback + */ + Cancellable updateEntry( + String key, HttpCacheCASOperation casOperation, FutureCallback callback); + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java new file mode 100644 index 000000000..c991fd7c5 --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpAsyncCacheStorageAdaptor.java @@ -0,0 +1,98 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.cache; + +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.util.Args; + +/** + * {@link HttpAsyncCacheStorage} implementation that emulates asynchronous + * behavior using an instance of classic {@link HttpCacheStorage}. + * + * @since 5.0 + */ +public final class HttpAsyncCacheStorageAdaptor implements HttpAsyncCacheStorage { + + private final HttpCacheStorage cacheStorage; + + public HttpAsyncCacheStorageAdaptor(final HttpCacheStorage cacheStorage) { + this.cacheStorage = Args.notNull(cacheStorage, "Cache strorage"); + } + + public Cancellable putEntry(final String key, final HttpCacheEntry entry, final FutureCallback callback) { + Args.notEmpty(key, "Key"); + Args.notNull(entry, "Cache ehtry"); + Args.notNull(callback, "Callback"); + try { + cacheStorage.putEntry(key, entry); + callback.completed(Boolean.TRUE); + } catch (final Exception ex) { + callback.failed(ex); + } + return NOOP_CANCELLABLE; + } + + public Cancellable getEntry(final String key, final FutureCallback callback) { + Args.notEmpty(key, "Key"); + Args.notNull(callback, "Callback"); + try { + final HttpCacheEntry entry = cacheStorage.getEntry(key); + callback.completed(entry); + } catch (final Exception ex) { + callback.failed(ex); + } + return NOOP_CANCELLABLE; + } + + public Cancellable removeEntry(final String key, final FutureCallback callback) { + Args.notEmpty(key, "Key"); + Args.notNull(callback, "Callback"); + try { + cacheStorage.removeEntry(key); + callback.completed(Boolean.TRUE); + } catch (final Exception ex) { + callback.failed(ex); + } + return NOOP_CANCELLABLE; + } + + public Cancellable updateEntry( + final String key, final HttpCacheCASOperation casOperation, final FutureCallback callback) { + Args.notEmpty(key, "Key"); + Args.notNull(casOperation, "CAS operation"); + Args.notNull(callback, "Callback"); + try { + cacheStorage.updateEntry(key, casOperation); + callback.completed(Boolean.TRUE); + } catch (final Exception ex) { + callback.failed(ex); + } + return NOOP_CANCELLABLE; + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java index 88c2aa02a..e58efcb09 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/cache/HttpCacheStorage.java @@ -27,9 +27,9 @@ package org.apache.hc.client5.http.cache; /** - * New storage backends should implement this {@link HttpCacheStorage} - * interface. They can then be plugged into the existing caching - * {@link org.apache.hc.client5.http.classic.HttpClient} implementation. + * {@literal HttpCacheStorage} represents an abstract HTTP cache + * storage backend that can then be plugged into the classic + * (blocking) request execution pipeline. * * @since 4.1 */ diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java new file mode 100644 index 000000000..b1361cabb --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractBinaryAsyncCacheStorage.java @@ -0,0 +1,46 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer; + +/** + * Abstract cache backend for serialized binary objects capable of CAS (compare-and-swap) updates. + * + * @since 5.0 + */ +public abstract class AbstractBinaryAsyncCacheStorage extends AbstractSerializingAsyncCacheStorage { + + public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer serializer) { + super(maxUpdateRetries, serializer); + } + + public AbstractBinaryAsyncCacheStorage(final int maxUpdateRetries) { + super(maxUpdateRetries, ByteArrayCacheEntrySerializer.INSTANCE); + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java new file mode 100644 index 000000000..36c055f6c --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AbstractSerializingAsyncCacheStorage.java @@ -0,0 +1,228 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage; +import org.apache.hc.client5.http.cache.HttpCacheCASOperation; +import org.apache.hc.client5.http.cache.HttpCacheEntry; +import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer; +import org.apache.hc.client5.http.cache.HttpCacheStorageEntry; +import org.apache.hc.client5.http.cache.HttpCacheUpdateException; +import org.apache.hc.client5.http.cache.ResourceIOException; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.util.Args; + +/** + * Abstract cache backend for serialized objects capable of CAS (compare-and-swap) updates. + * + * @since 5.0 + */ +public abstract class AbstractSerializingAsyncCacheStorage implements HttpAsyncCacheStorage { + + private final int maxUpdateRetries; + private final HttpCacheEntrySerializer serializer; + + public AbstractSerializingAsyncCacheStorage(final int maxUpdateRetries, final HttpCacheEntrySerializer serializer) { + this.maxUpdateRetries = Args.notNegative(maxUpdateRetries, "Max retries"); + this.serializer = Args.notNull(serializer, "Cache entry serializer"); + } + + protected abstract String digestToStorageKey(String key); + + protected abstract T getStorageObject(CAS cas) throws ResourceIOException; + + protected abstract Cancellable store(String storageKey, T storageObject, FutureCallback callback); + + protected abstract Cancellable restore(String storageKey, FutureCallback callback); + + protected abstract Cancellable getForUpdateCAS(String storageKey, FutureCallback callback); + + protected abstract Cancellable updateCAS(String storageKey, CAS cas, T storageObject, FutureCallback callback); + + protected abstract Cancellable delete(String storageKey, FutureCallback callback); + + @Override + public final Cancellable putEntry( + final String key, final HttpCacheEntry entry, final FutureCallback callback) { + Args.notNull(key, "Storage key"); + Args.notNull(callback, "Callback"); + try { + final String storageKey = digestToStorageKey(key); + final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, entry)); + return store(storageKey, storageObject, callback); + } catch (final Exception ex) { + callback.failed(ex); + return NOOP_CANCELLABLE; + } + } + + @Override + public final Cancellable getEntry(final String key, final FutureCallback callback) { + Args.notNull(key, "Storage key"); + Args.notNull(callback, "Callback"); + try { + final String storageKey = digestToStorageKey(key); + return restore(storageKey, new FutureCallback() { + + @Override + public void completed(final T storageObject) { + try { + if (storageObject != null) { + final HttpCacheStorageEntry entry = serializer.deserialize(storageObject); + if (key.equals(entry.getKey())) { + callback.completed(entry.getContent()); + } else { + callback.completed(null); + } + } else { + callback.completed(null); + } + } catch (final Exception ex) { + callback.failed(ex); + } + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + }); + } catch (final Exception ex) { + callback.failed(ex); + return NOOP_CANCELLABLE; + } + } + + @Override + public final Cancellable removeEntry(final String key, final FutureCallback callback) { + Args.notNull(key, "Storage key"); + Args.notNull(callback, "Callback"); + try { + final String storageKey = digestToStorageKey(key); + return delete(storageKey, callback); + } catch (final Exception ex) { + callback.failed(ex); + return NOOP_CANCELLABLE; + } + } + + @Override + public final Cancellable updateEntry( + final String key, final HttpCacheCASOperation casOperation, final FutureCallback callback) { + Args.notNull(key, "Storage key"); + Args.notNull(casOperation, "CAS operation"); + Args.notNull(callback, "Callback"); + final ComplexCancellable complexCancellable = new ComplexCancellable(); + final AtomicInteger count = new AtomicInteger(0); + atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback); + return complexCancellable; + } + + private void atemmptUpdateEntry( + final String key, + final HttpCacheCASOperation casOperation, + final ComplexCancellable complexCancellable, + final AtomicInteger count, + final FutureCallback callback) { + try { + final String storageKey = digestToStorageKey(key); + complexCancellable.setDependency(getForUpdateCAS(storageKey, new FutureCallback() { + + @Override + public void completed(final CAS cas) { + try { + HttpCacheStorageEntry storageEntry = cas != null ? serializer.deserialize(getStorageObject(cas)) : null; + if (storageEntry != null && !key.equals(storageEntry.getKey())) { + storageEntry = null; + } + final HttpCacheEntry existingEntry = storageEntry != null ? storageEntry.getContent() : null; + final HttpCacheEntry updatedEntry = casOperation.execute(existingEntry); + if (existingEntry == null) { + putEntry(key, updatedEntry, callback); + } else { + final T storageObject = serializer.serialize(new HttpCacheStorageEntry(key, updatedEntry)); + complexCancellable.setDependency(updateCAS(storageKey, cas, storageObject, new FutureCallback() { + + @Override + public void completed(final Boolean result) { + if (result) { + callback.completed(result); + } else { + if (!complexCancellable.isCancelled()) { + final int numRetries = count.incrementAndGet(); + if (numRetries >= maxUpdateRetries) { + callback.failed(new HttpCacheUpdateException("Cache update failed after " + numRetries + " retries")); + } else { + atemmptUpdateEntry(key, casOperation, complexCancellable, count, callback); + } + } + } + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + })); + } + } catch (final Exception ex) { + callback.failed(ex); + } + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + })); + } catch (final Exception ex) { + callback.failed(ex); + } + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java new file mode 100644 index 000000000..55273efac --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/ComplexCancellable.java @@ -0,0 +1,74 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.util.Args; + +/** + * TODO: replace with ComplexCancellable from HttpCore 5.0b2 + */ +final class ComplexCancellable implements Cancellable { + + private final AtomicReference dependencyRef; + private final AtomicBoolean cancelled; + + public ComplexCancellable() { + this.dependencyRef = new AtomicReference<>(null); + this.cancelled = new AtomicBoolean(false); + } + + public boolean isCancelled() { + return cancelled.get(); + } + + public void setDependency(final Cancellable dependency) { + Args.notNull(dependency, "dependency"); + if (!cancelled.get()) { + dependencyRef.set(dependency); + } else { + dependency.cancel(); + } + } + + @Override + public boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + final Cancellable dependency = dependencyRef.getAndSet(null); + if (dependency != null) { + dependency.cancel(); + } + return true; + } else { + return false; + } + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java new file mode 100644 index 000000000..478c578ed --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpAsyncCacheStorage.java @@ -0,0 +1,250 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache.memcached; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer; +import org.apache.hc.client5.http.cache.ResourceIOException; +import org.apache.hc.client5.http.impl.cache.AbstractBinaryAsyncCacheStorage; +import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer; +import org.apache.hc.client5.http.impl.cache.CacheConfig; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.util.Args; + +import net.spy.memcached.CASResponse; +import net.spy.memcached.CASValue; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.internal.GetCompletionListener; +import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationCompletionListener; +import net.spy.memcached.internal.OperationFuture; + +/** + *

+ * This class is a storage backend that uses an external memcached + * for storing cached origin responses. This storage option provides a + * couple of interesting advantages over the default in-memory storage + * backend: + *

+ *
    + *
  1. in-memory cached objects can survive an application restart since + * they are held in a separate process
  2. + *
  3. it becomes possible for several cooperating applications to share + * a large memcached farm together
  4. + *
+ *

+ * Note that in a shared memcached pool setting you may wish to make use + * of the Ketama consistent hashing algorithm to reduce the number of + * cache misses that might result if one of the memcached cluster members + * fails (see the + * KetamaConnectionFactory). + *

+ *

+ * Because memcached places limits on the size of its keys, we need to + * introduce a key hashing scheme to map the annotated URLs the higher-level + * caching HTTP client wants to use as keys onto ones that are suitable + * for use with memcached. Please see {@link KeyHashingScheme} if you would + * like to use something other than the provided {@link SHA256KeyHashingScheme}. + *

+ * + *

+ * Please refer to the + * memcached documentation and in particular to the documentation for + * the spymemcached + * documentation for details about how to set up and configure memcached + * and the Java client used here, respectively. + *

+ * + * @since 5.0 + */ +public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStorage> { + + private final MemcachedClient client; + private final KeyHashingScheme keyHashingScheme; + + /** + * Create a storage backend talking to a memcached instance + * listening on the specified host and port. This is useful if you + * just have a single local memcached instance running on the same + * machine as your application, for example. + * @param address where the memcached daemon is running + * @throws IOException in case of an error + */ + public MemcachedHttpAsyncCacheStorage(final InetSocketAddress address) throws IOException { + this(new MemcachedClient(address)); + } + + /** + * Create a storage backend using the pre-configured given + * memcached client. + * @param cache client to use for communicating with memcached + */ + public MemcachedHttpAsyncCacheStorage(final MemcachedClient cache) { + this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE); + } + + /** + * Create a storage backend using the given memcached client and + * applying the given cache configuration, serialization, and hashing + * mechanisms. + * @param client how to talk to memcached + * @param config apply HTTP cache-related options + * @param serializer alternative serialization mechanism + * @param keyHashingScheme how to map higher-level logical "storage keys" + * onto "cache keys" suitable for use with memcached + */ + public MemcachedHttpAsyncCacheStorage( + final MemcachedClient client, + final CacheConfig config, + final HttpCacheEntrySerializer serializer, + final KeyHashingScheme keyHashingScheme) { + super((config != null ? config : CacheConfig.DEFAULT).getMaxUpdateRetries(), + serializer != null ? serializer : ByteArrayCacheEntrySerializer.INSTANCE); + this.client = Args.notNull(client, "Memcached client"); + this.keyHashingScheme = keyHashingScheme; + } + + @Override + protected String digestToStorageKey(final String key) { + return keyHashingScheme.hash(key); + } + + private byte[] castAsByteArray(final Object storageObject) throws ResourceIOException { + if (storageObject == null) { + return null; + } + if (storageObject instanceof byte[]) { + return (byte[]) storageObject; + } else { + throw new ResourceIOException("Unexpected cache content: " + storageObject.getClass()); + } + } + + @Override + protected byte[] getStorageObject(final CASValue casValue) throws ResourceIOException { + return castAsByteArray(casValue.getValue()); + } + + private Cancellable operation(final OperationFuture operationFuture, final FutureCallback callback) { + operationFuture.addListener(new OperationCompletionListener() { + + @Override + public void onComplete(final OperationFuture future) throws Exception { + try { + callback.completed(operationFuture.get()); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof Exception) { + callback.failed((Exception) ex.getCause()); + } else { + callback.failed(ex); + } + } + } + + }); + return new Cancellable() { + + @Override + public boolean cancel() { + return operationFuture.cancel(); + } + + }; + } + + @Override + protected Cancellable store(final String storageKey, final byte[] storageObject, final FutureCallback callback) { + return operation(client.set(storageKey, 0, storageObject), callback); + } + + @Override + protected Cancellable restore(final String storageKey, final FutureCallback callback) { + final GetFuture getFuture = client.asyncGet(storageKey); + getFuture.addListener(new GetCompletionListener() { + + @Override + public void onComplete(final GetFuture future) throws Exception { + try { + callback.completed(castAsByteArray(getFuture.get())); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof Exception) { + callback.failed((Exception) ex.getCause()); + } else { + callback.failed(ex); + } + } + } + + }); + return new Cancellable() { + + @Override + public boolean cancel() { + return getFuture.cancel(true); + } + + }; + } + + @Override + protected Cancellable getForUpdateCAS(final String storageKey, final FutureCallback> callback) { + return operation(client.asyncGets(storageKey), callback); + } + + @Override + protected Cancellable updateCAS( + final String storageKey, final CASValue casValue, final byte[] storageObject, final FutureCallback callback) { + return operation(client.asyncCAS(storageKey, casValue.getCas(), storageObject), new FutureCallback() { + + @Override + public void completed(final CASResponse result) { + callback.completed(result == CASResponse.OK); + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + }); + } + + @Override + protected Cancellable delete(final String storageKey, final FutureCallback callback) { + return operation(client.delete(storageKey), callback); + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java index 39d209eaf..6a6195ccc 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/memcached/MemcachedHttpCacheStorage.java @@ -32,14 +32,13 @@ import java.net.InetSocketAddress; import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer; import org.apache.hc.client5.http.cache.ResourceIOException; import org.apache.hc.client5.http.impl.cache.AbstractBinaryCacheStorage; -import org.apache.hc.client5.http.impl.cache.CacheConfig; import org.apache.hc.client5.http.impl.cache.ByteArrayCacheEntrySerializer; +import org.apache.hc.client5.http.impl.cache.CacheConfig; import org.apache.hc.core5.util.Args; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; import net.spy.memcached.MemcachedClient; -import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.OperationTimeoutException; /** @@ -82,7 +81,7 @@ import net.spy.memcached.OperationTimeoutException; */ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage> { - private final MemcachedClientIF client; + private final MemcachedClient client; private final KeyHashingScheme keyHashingScheme; /** @@ -102,7 +101,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStoragememcached client. * @param cache client to use for communicating with memcached */ - public MemcachedHttpCacheStorage(final MemcachedClientIF cache) { + public MemcachedHttpCacheStorage(final MemcachedClient cache) { this(cache, CacheConfig.DEFAULT, ByteArrayCacheEntrySerializer.INSTANCE, SHA256KeyHashingScheme.INSTANCE); } @@ -117,7 +116,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage serializer, final KeyHashingScheme keyHashingScheme) { @@ -134,11 +133,7 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage. + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.client5.http.cache.HttpCacheCASOperation; +import org.apache.hc.client5.http.cache.HttpCacheEntry; +import org.apache.hc.client5.http.cache.HttpCacheStorageEntry; +import org.apache.hc.client5.http.cache.HttpCacheUpdateException; +import org.apache.hc.client5.http.cache.ResourceIOException; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class TestAbstractSerializingAsyncCacheStorage { + + @Mock + private Cancellable cancellable; + @Mock + private FutureCallback operationCallback; + @Mock + private FutureCallback cacheEntryCallback; + + private AbstractBinaryAsyncCacheStorage impl; + + public static byte[] serialize(final String key, final HttpCacheEntry value) throws ResourceIOException { + return ByteArrayCacheEntrySerializer.INSTANCE.serialize(new HttpCacheStorageEntry(key, value)); + } + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + impl = Mockito.mock(AbstractBinaryAsyncCacheStorage.class, + Mockito.withSettings().defaultAnswer(Answers.CALLS_REAL_METHODS).useConstructor(3)); + } + + @Test + public void testCachePut() throws Exception { + final String key = "foo"; + final HttpCacheEntry value = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.store( + Mockito.eq("bar"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(2); + callback.completed(true); + return cancellable; + } + + }); + + impl.putEntry(key, value, operationCallback); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(byte[].class); + Mockito.verify(impl).store(Mockito.eq("bar"), argumentCaptor.capture(), Mockito.>any()); + Assert.assertArrayEquals(serialize(key, value), argumentCaptor.getValue()); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testCacheGetNullEntry() throws Exception { + final String key = "foo"; + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed(null); + return cancellable; + } + + }); + + impl.getEntry(key, cacheEntryCallback); + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class); + Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture()); + Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue()); + Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.>any()); + } + + @Test + public void testCacheGet() throws Exception { + final String key = "foo"; + final HttpCacheEntry value = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed(serialize(key, value)); + return cancellable; + } + + }); + + impl.getEntry(key, cacheEntryCallback); + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class); + Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture()); + final HttpCacheEntry resultingEntry = argumentCaptor.getValue(); + Assert.assertThat(resultingEntry, HttpCacheEntryMatcher.equivalent(value)); + Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.>any()); + } + + @Test + public void testCacheGetKeyMismatch() throws Exception { + final String key = "foo"; + final HttpCacheEntry value = HttpTestUtils.makeCacheEntry(); + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.restore(Mockito.eq("bar"), Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed(serialize("not-foo", value)); + return cancellable; + } + + }); + + impl.getEntry(key, cacheEntryCallback); + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(HttpCacheEntry.class); + Mockito.verify(cacheEntryCallback).completed(argumentCaptor.capture()); + Assert.assertThat(argumentCaptor.getValue(), CoreMatchers.nullValue()); + Mockito.verify(impl).restore(Mockito.eq("bar"), Mockito.>any()); + } + + @Test + public void testCacheRemove() throws Exception{ + final String key = "foo"; + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.delete( + Mockito.eq("bar"), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed(true); + return cancellable; + } + + }); + impl.removeEntry(key, operationCallback); + + Mockito.verify(impl).delete("bar", operationCallback); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testCacheUpdateNullEntry() throws Exception { + final String key = "foo"; + final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed(null); + return cancellable; + } + + }); + Mockito.when(impl.store( + Mockito.eq("bar"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(2); + callback.completed(true); + return cancellable; + } + + }); + + impl.updateEntry(key, new HttpCacheCASOperation() { + + @Override + public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException { + Assert.assertThat(existing, CoreMatchers.nullValue()); + return updatedValue; + } + + }, operationCallback); + + Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.>any()); + Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.any(), Mockito.>any()); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testCacheCASUpdate() throws Exception { + final String key = "foo"; + final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry(); + final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed("stuff"); + return cancellable; + } + + }); + Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue)); + Mockito.when(impl.updateCAS( + Mockito.eq("bar"), + Mockito.eq("stuff"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(3); + callback.completed(true); + return cancellable; + } + + }); + + impl.updateEntry(key, new HttpCacheCASOperation() { + + @Override + public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException { + return updatedValue; + } + + }, operationCallback); + + Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.>any()); + Mockito.verify(impl).getStorageObject("stuff"); + Mockito.verify(impl).updateCAS(Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.any(), Mockito.>any()); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testCacheCASUpdateKeyMismatch() throws Exception { + final String key = "foo"; + final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry(); + final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.>any())).thenAnswer( + new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed("stuff"); + return cancellable; + } + + }); + Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize("not-foo", existingValue)); + Mockito.when(impl.store( + Mockito.eq("bar"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(2); + callback.completed(true); + return cancellable; + } + + }); + + impl.updateEntry(key, new HttpCacheCASOperation() { + + @Override + public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException { + Assert.assertThat(existing, CoreMatchers.nullValue()); + return updatedValue; + } + + }, operationCallback); + + Mockito.verify(impl).getForUpdateCAS(Mockito.eq("bar"), Mockito.>any()); + Mockito.verify(impl).getStorageObject("stuff"); + Mockito.verify(impl, Mockito.never()).updateCAS( + Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.any(), Mockito.>any()); + Mockito.verify(impl).store(Mockito.eq("bar"), Mockito.any(), Mockito.>any()); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testSingleCacheUpdateRetry() throws Exception { + final String key = "foo"; + final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry(); + final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.>any())).thenAnswer( + new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed("stuff"); + return cancellable; + } + + }); + Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue)); + final AtomicInteger count = new AtomicInteger(0); + Mockito.when(impl.updateCAS( + Mockito.eq("bar"), + Mockito.eq("stuff"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(3); + if (count.incrementAndGet() == 1) { + callback.completed(false); + } else { + callback.completed(true); + } + return cancellable; + } + + }); + + impl.updateEntry(key, new HttpCacheCASOperation() { + + @Override + public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException { + return updatedValue; + } + + }, operationCallback); + + Mockito.verify(impl, Mockito.times(2)).getForUpdateCAS(Mockito.eq("bar"), Mockito.>any()); + Mockito.verify(impl, Mockito.times(2)).getStorageObject("stuff"); + Mockito.verify(impl, Mockito.times(2)).updateCAS( + Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.any(), Mockito.>any()); + Mockito.verify(operationCallback).completed(Boolean.TRUE); + } + + @Test + public void testCacheUpdateFail() throws Exception { + final String key = "foo"; + final HttpCacheEntry existingValue = HttpTestUtils.makeCacheEntry(); + final HttpCacheEntry updatedValue = HttpTestUtils.makeCacheEntry(); + + Mockito.when(impl.digestToStorageKey(key)).thenReturn("bar"); + Mockito.when(impl.getForUpdateCAS(Mockito.eq("bar"), Mockito.>any())).thenAnswer( + new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(1); + callback.completed("stuff"); + return cancellable; + } + + }); + Mockito.when(impl.getStorageObject("stuff")).thenReturn(serialize(key, existingValue)); + final AtomicInteger count = new AtomicInteger(0); + Mockito.when(impl.updateCAS( + Mockito.eq("bar"), + Mockito.eq("stuff"), + Mockito.any(), + Mockito.>any())).thenAnswer(new Answer() { + + @Override + public Cancellable answer(final InvocationOnMock invocation) throws Throwable { + final FutureCallback callback = invocation.getArgument(3); + if (count.incrementAndGet() <= 3) { + callback.completed(false); + } else { + callback.completed(true); + } + return cancellable; + } + + }); + + impl.updateEntry(key, new HttpCacheCASOperation() { + + @Override + public HttpCacheEntry execute(final HttpCacheEntry existing) throws ResourceIOException { + return updatedValue; + } + + }, operationCallback); + + Mockito.verify(impl, Mockito.times(3)).getForUpdateCAS(Mockito.eq("bar"), Mockito.>any()); + Mockito.verify(impl, Mockito.times(3)).getStorageObject("stuff"); + Mockito.verify(impl, Mockito.times(3)).updateCAS( + Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.any(), Mockito.>any()); + Mockito.verify(operationCallback).failed(Mockito.any()); + } + +}