New APIs for cache entry bulk retrieval; bulk retrieval support by Memcached storage implementation

This commit is contained in:
Oleg Kalnichevski 2017-12-15 14:06:23 +01:00
parent 002f40f9d3
commit 29666a1ad4
13 changed files with 423 additions and 4 deletions

View File

@ -26,6 +26,9 @@
*/
package org.apache.hc.client5.http.cache;
import java.util.Collection;
import java.util.Map;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
@ -87,4 +90,14 @@ public interface HttpAsyncCacheStorage {
Cancellable updateEntry(
String key, HttpCacheCASOperation casOperation, FutureCallback<Boolean> callback);
/**
* Retrieves multiple cache entries stored under the given keys. Some implementations
* may use a single bulk operation to do the retrieval.
*
* @param keys cache keys
* @param callback result callback
*/
Cancellable getEntries(Collection<String> keys, FutureCallback<Map<String, HttpCacheEntry>> callback);
}

View File

@ -26,6 +26,9 @@
*/
package org.apache.hc.client5.http.cache;
import java.util.Collection;
import java.util.Map;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.util.Args;
@ -95,4 +98,15 @@ public final class HttpAsyncCacheStorageAdaptor implements HttpAsyncCacheStorage
return NOOP_CANCELLABLE;
}
public Cancellable getEntries(final Collection<String> keys, final FutureCallback<Map<String, HttpCacheEntry>> callback) {
Args.notNull(keys, "Key");
Args.notNull(callback, "Callback");
try {
callback.completed(cacheStorage.getEntries(keys));
} catch (final Exception ex) {
callback.failed(ex);
}
return NOOP_CANCELLABLE;
}
}

View File

@ -26,6 +26,9 @@
*/
package org.apache.hc.client5.http.cache;
import java.util.Collection;
import java.util.Map;
/**
* {@literal HttpCacheStorage} represents an abstract HTTP cache
* storage backend that can then be plugged into the classic
@ -72,4 +75,16 @@ public interface HttpCacheStorage {
void updateEntry(
String key, HttpCacheCASOperation casOperation) throws ResourceIOException, HttpCacheUpdateException;
/**
* Retrieves multiple cache entries stored under the given keys. Some implementations
* may use a single bulk operation to do the retrieval.
*
* @param keys cache keys
* @return an map of {@link HttpCacheEntry}s.
*
* @since 5.0
*/
Map<String, HttpCacheEntry> getEntries(Collection<String> keys) throws ResourceIOException;
}

View File

@ -26,6 +26,11 @@
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.cache.HttpAsyncCacheStorage;
@ -68,6 +73,8 @@ public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements Ht
protected abstract Cancellable delete(String storageKey, FutureCallback<Boolean> callback);
protected abstract Cancellable bulkRestore(Collection<String> storageKeys, FutureCallback<Map<String, T>> callback);
@Override
public final Cancellable putEntry(
final String key, final HttpCacheEntry entry, final FutureCallback<Boolean> callback) {
@ -225,4 +232,49 @@ public abstract class AbstractSerializingAsyncCacheStorage<T, CAS> implements Ht
}
}
@Override
public final Cancellable getEntries(final Collection<String> keys, final FutureCallback<Map<String, HttpCacheEntry>> callback) {
Args.notNull(keys, "Storage keys");
Args.notNull(callback, "Callback");
try {
final List<String> storageKeys = new ArrayList<>(keys.size());
for (final String key: keys) {
storageKeys.add(digestToStorageKey(key));
}
return bulkRestore(storageKeys, new FutureCallback<Map<String, T>>() {
@Override
public void completed(final Map<String, T> storageObjects) {
try {
final Map<String, HttpCacheEntry> resultMap = new HashMap<>();
for (final Map.Entry<String, T> storageEntry: storageObjects.entrySet()) {
final String key = storageEntry.getKey();
final HttpCacheStorageEntry entry = serializer.deserialize(storageEntry.getValue());
if (key.equals(entry.getKey())) {
resultMap.put(key, entry.getContent());
}
}
callback.completed(resultMap);
} catch (final Exception ex) {
callback.failed(ex);
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
} catch (final Exception ex) {
callback.failed(ex);
return NOOP_CANCELLABLE;
}
}
}

View File

@ -26,11 +26,17 @@
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.HttpCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.core5.util.Args;
@ -64,6 +70,8 @@ public abstract class AbstractSerializingCacheStorage<T, CAS> implements HttpCac
protected abstract void delete(String storageKey) throws ResourceIOException;
protected abstract Map<String, T> bulkRestore(Collection<String> storageKeys) throws ResourceIOException;
@Override
public final void putEntry(final String key, final HttpCacheEntry entry) throws ResourceIOException {
final String storageKey = digestToStorageKey(key);
@ -124,4 +132,23 @@ public abstract class AbstractSerializingCacheStorage<T, CAS> implements HttpCac
}
}
@Override
public final Map<String, HttpCacheEntry> getEntries(final Collection<String> keys) throws ResourceIOException {
Args.notNull(keys, "Storage keys");
final List<String> storageKeys = new ArrayList<>(keys.size());
for (final String key: keys) {
storageKeys.add(digestToStorageKey(key));
}
final Map<String, T> storageObjectMap = bulkRestore(storageKeys);
final Map<String, HttpCacheEntry> resultMap = new HashMap<>();
for (final Map.Entry<String, T> storageEntry: storageObjectMap.entrySet()) {
final String key = storageEntry.getKey();
final HttpCacheStorageEntry entry = serializer.deserialize(storageEntry.getValue());
if (key.equals(entry.getKey())) {
resultMap.put(key, entry.getContent());
}
}
return resultMap;
}
}

View File

@ -26,12 +26,17 @@
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.util.Args;
/**
* Basic {@link HttpCacheStorage} implementation backed by an instance of
@ -97,4 +102,17 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {
entries.put(url, casOperation.execute(existingEntry));
}
@Override
public Map<String, HttpCacheEntry> getEntries(final Collection<String> keys) throws ResourceIOException {
Args.notNull(keys, "Key");
final Map<String, HttpCacheEntry> resultMap = new HashMap<>(keys.size());
for (final String key: keys) {
final HttpCacheEntry entry = getEntry(key);
if (entry != null) {
resultMap.put(key, entry);
}
}
return resultMap;
}
}

View File

@ -28,7 +28,10 @@ package org.apache.hc.client5.http.impl.cache;
import java.io.Closeable;
import java.lang.ref.ReferenceQueue;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -150,6 +153,19 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
}
}
@Override
public Map<String, HttpCacheEntry> getEntries(final Collection<String> keys) throws ResourceIOException {
Args.notNull(keys, "Key");
final Map<String, HttpCacheEntry> resultMap = new HashMap<>(keys.size());
for (final String key: keys) {
final HttpCacheEntry entry = getEntry(key);
if (entry != null) {
resultMap.put(key, entry);
}
}
return resultMap;
}
public void cleanResources() {
if (this.active.get()) {
ResourceReference ref;

View File

@ -26,6 +26,10 @@
*/
package org.apache.hc.client5.http.impl.cache.ehcache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
import org.apache.hc.client5.http.cache.ResourceIOException;
@ -131,4 +135,16 @@ public class EhcacheHttpCacheStorage<T> extends AbstractSerializingCacheStorage<
cache.remove(storageKey);
}
@Override
protected Map<String, T> bulkRestore(final Collection<String> storageKeys) throws ResourceIOException {
final Map<String, T> resultMap = new HashMap<>();
for (final String storageKey: storageKeys) {
final T storageObject = cache.get(storageKey);
if (storageObject != null) {
resultMap.put(storageKey, storageObject);
}
}
return resultMap;
}
}

View File

@ -28,6 +28,9 @@ package org.apache.hc.client5.http.impl.cache.memcached;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
@ -42,6 +45,9 @@ import org.apache.hc.core5.util.Args;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.BulkGetCompletionListener;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.GetCompletionListener;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationCompletionListener;
@ -247,4 +253,29 @@ public class MemcachedHttpAsyncCacheStorage extends AbstractBinaryAsyncCacheStor
return operation(client.delete(storageKey), callback);
}
@Override
protected Cancellable bulkRestore(final Collection<String> storageKeys, final FutureCallback<Map<String, byte[]>> callback) {
final BulkFuture<Map<String, Object>> future = client.asyncGetBulk(storageKeys);
future.addListener(new BulkGetCompletionListener() {
@Override
public void onComplete(final BulkGetFuture<?> future) throws Exception {
final Map<String, ?> storageObjectMap = future.get();
final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
}
callback.completed(resultMap);
}
});
return new Cancellable() {
@Override
public boolean cancel() {
return future.cancel(true);
}
};
}
}

View File

@ -28,6 +28,9 @@ package org.apache.hc.client5.http.impl.cache.memcached;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheEntrySerializer;
import org.apache.hc.client5.http.cache.ResourceIOException;
@ -182,4 +185,14 @@ public class MemcachedHttpCacheStorage extends AbstractBinaryCacheStorage<CASVal
client.delete(storageKey);
}
@Override
protected Map<String, byte[]> bulkRestore(final Collection<String> storageKeys) throws ResourceIOException {
final Map<String, ?> storageObjectMap = client.getBulk(storageKeys);
final Map<String, byte[]> resultMap = new HashMap<>(storageObjectMap.size());
for (final Map.Entry<String, ?> resultEntry: storageObjectMap.entrySet()) {
resultMap.put(resultEntry.getKey(), castAsByteArray(resultEntry.getValue()));
}
return resultMap;
}
}

View File

@ -26,12 +26,13 @@
*/
package org.apache.hc.client5.http.impl.cache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheStorage;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.ResourceIOException;
class SimpleHttpCacheStorage implements HttpCacheStorage {
@ -65,4 +66,16 @@ class SimpleHttpCacheStorage implements HttpCacheStorage {
map.put(key,v2);
}
@Override
public Map<String, HttpCacheEntry> getEntries(final Collection<String> keys) throws ResourceIOException {
final Map<String, HttpCacheEntry> resultMap = new HashMap<>(keys.size());
for (final String key: keys) {
final HttpCacheEntry entry = getEntry(key);
if (entry != null) {
resultMap.put(key, entry);
}
}
return resultMap;
}
}

View File

@ -26,6 +26,13 @@
*/
package org.apache.hc.client5.http.impl.cache;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
@ -57,6 +64,8 @@ public class TestAbstractSerializingAsyncCacheStorage {
private FutureCallback<Boolean> operationCallback;
@Mock
private FutureCallback<HttpCacheEntry> cacheEntryCallback;
@Mock
private FutureCallback<Map<String, HttpCacheEntry>> bulkCacheEntryCallback;
private AbstractBinaryAsyncCacheStorage<String> impl;
@ -447,4 +456,102 @@ public class TestAbstractSerializingAsyncCacheStorage {
Mockito.verify(operationCallback).failed(Mockito.<HttpCacheUpdateException>any());
}
@Test
@SuppressWarnings("unchecked")
public void testBulkGet() throws Exception {
final String key1 = "foo this";
final String key2 = "foo that";
final String storageKey1 = "bar this";
final String storageKey2 = "bar that";
final HttpCacheEntry value1 = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry value2 = HttpTestUtils.makeCacheEntry();
when(impl.digestToStorageKey(key1)).thenReturn(storageKey1);
when(impl.digestToStorageKey(key2)).thenReturn(storageKey2);
when(impl.bulkRestore(
Mockito.<String>anyCollection(),
Mockito.<FutureCallback<Map<String, byte[]>>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final Collection<String> keys = invocation.getArgument(0);
final FutureCallback<Map<String, byte[]>> callback = invocation.getArgument(1);
final Map<String, byte[]> resultMap = new HashMap<>();
if (keys.contains(storageKey1)) {
resultMap.put(storageKey1, serialize(key1, value1));
}
if (keys.contains(storageKey2)) {
resultMap.put(storageKey2, serialize(key2, value2));
}
callback.completed(resultMap);
return cancellable;
}
});
impl.getEntries(Arrays.asList(key1, key2), bulkCacheEntryCallback);
final ArgumentCaptor<Map<String, HttpCacheEntry>> argumentCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.verify(bulkCacheEntryCallback).completed(argumentCaptor.capture());
final Map<String, HttpCacheEntry> entryMap = argumentCaptor.getValue();
Assert.assertThat(entryMap, CoreMatchers.notNullValue());
Assert.assertThat(entryMap.get(key1), HttpCacheEntryMatcher.equivalent(value1));
Assert.assertThat(entryMap.get(key2), HttpCacheEntryMatcher.equivalent(value2));
verify(impl).digestToStorageKey(key1);
verify(impl).digestToStorageKey(key2);
verify(impl).bulkRestore(
Mockito.eq(Arrays.asList(storageKey1, storageKey2)),
Mockito.<FutureCallback<Map<String, byte[]>>>any());
}
@Test
@SuppressWarnings("unchecked")
public void testBulkGetKeyMismatch() throws Exception {
final String key1 = "foo this";
final String key2 = "foo that";
final String storageKey1 = "bar this";
final String storageKey2 = "bar that";
final HttpCacheEntry value1 = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry value2 = HttpTestUtils.makeCacheEntry();
when(impl.digestToStorageKey(key1)).thenReturn(storageKey1);
when(impl.digestToStorageKey(key2)).thenReturn(storageKey2);
when(impl.bulkRestore(
Mockito.<String>anyCollection(),
Mockito.<FutureCallback<Map<String, byte[]>>>any())).thenAnswer(new Answer<Cancellable>() {
@Override
public Cancellable answer(final InvocationOnMock invocation) throws Throwable {
final Collection<String> keys = invocation.getArgument(0);
final FutureCallback<Map<String, byte[]>> callback = invocation.getArgument(1);
final Map<String, byte[]> resultMap = new HashMap<>();
if (keys.contains(storageKey1)) {
resultMap.put(storageKey1, serialize(key1, value1));
}
if (keys.contains(storageKey2)) {
resultMap.put(storageKey2, serialize("not foo", value2));
}
callback.completed(resultMap);
return cancellable;
}
});
impl.getEntries(Arrays.asList(key1, key2), bulkCacheEntryCallback);
final ArgumentCaptor<Map<String, HttpCacheEntry>> argumentCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.verify(bulkCacheEntryCallback).completed(argumentCaptor.capture());
final Map<String, HttpCacheEntry> entryMap = argumentCaptor.getValue();
Assert.assertThat(entryMap, CoreMatchers.notNullValue());
Assert.assertThat(entryMap.get(key1), HttpCacheEntryMatcher.equivalent(value1));
Assert.assertThat(entryMap.get(key2), CoreMatchers.nullValue());
verify(impl).digestToStorageKey(key1);
verify(impl).digestToStorageKey(key2);
verify(impl).bulkRestore(
Mockito.eq(Arrays.asList(storageKey1, storageKey2)),
Mockito.<FutureCallback<Map<String, byte[]>>>any());
}
}

View File

@ -30,9 +30,14 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
import org.apache.hc.client5.http.cache.HttpCacheStorageEntry;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheUpdateException;
import org.apache.hc.client5.http.cache.ResourceIOException;
import org.hamcrest.CoreMatchers;
@ -42,6 +47,8 @@ import org.junit.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@SuppressWarnings("boxing") // test code
public class TestAbstractSerializingCacheStorage {
@ -253,4 +260,81 @@ public class TestAbstractSerializingCacheStorage {
verify(impl, Mockito.times(3)).getStorageObject("stuff");
verify(impl, Mockito.times(3)).updateCAS(Mockito.eq("bar"), Mockito.eq("stuff"), Mockito.<byte[]>any());
}
@Test
public void testBulkGet() throws Exception {
final String key1 = "foo this";
final String key2 = "foo that";
final String storageKey1 = "bar this";
final String storageKey2 = "bar that";
final HttpCacheEntry value1 = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry value2 = HttpTestUtils.makeCacheEntry();
when(impl.digestToStorageKey(key1)).thenReturn(storageKey1);
when(impl.digestToStorageKey(key2)).thenReturn(storageKey2);
when(impl.bulkRestore(Mockito.<String>anyCollection())).thenAnswer(new Answer<Map<String, byte[]>>() {
@Override
public Map<String, byte[]> answer(final InvocationOnMock invocation) throws Throwable {
final Collection<String> keys = invocation.getArgument(0);
final Map<String, byte[]> resultMap = new HashMap<>();
if (keys.contains(storageKey1)) {
resultMap.put(storageKey1, serialize(key1, value1));
}
if (keys.contains(storageKey2)) {
resultMap.put(storageKey2, serialize(key2, value2));
}
return resultMap;
}
});
final Map<String, HttpCacheEntry> entryMap = impl.getEntries(Arrays.asList(key1, key2));
Assert.assertThat(entryMap, CoreMatchers.notNullValue());
Assert.assertThat(entryMap.get(key1), HttpCacheEntryMatcher.equivalent(value1));
Assert.assertThat(entryMap.get(key2), HttpCacheEntryMatcher.equivalent(value2));
verify(impl).digestToStorageKey(key1);
verify(impl).digestToStorageKey(key2);
verify(impl).bulkRestore(Arrays.asList(storageKey1, storageKey2));
}
@Test
public void testBulkGetKeyMismatch() throws Exception {
final String key1 = "foo this";
final String key2 = "foo that";
final String storageKey1 = "bar this";
final String storageKey2 = "bar that";
final HttpCacheEntry value1 = HttpTestUtils.makeCacheEntry();
final HttpCacheEntry value2 = HttpTestUtils.makeCacheEntry();
when(impl.digestToStorageKey(key1)).thenReturn(storageKey1);
when(impl.digestToStorageKey(key2)).thenReturn(storageKey2);
when(impl.bulkRestore(Mockito.<String>anyCollection())).thenAnswer(new Answer<Map<String, byte[]>>() {
@Override
public Map<String, byte[]> answer(final InvocationOnMock invocation) throws Throwable {
final Collection<String> keys = invocation.getArgument(0);
final Map<String, byte[]> resultMap = new HashMap<>();
if (keys.contains(storageKey1)) {
resultMap.put(storageKey1, serialize(key1, value1));
}
if (keys.contains(storageKey2)) {
resultMap.put(storageKey2, serialize("not foo", value2));
}
return resultMap;
}
});
final Map<String, HttpCacheEntry> entryMap = impl.getEntries(Arrays.asList(key1, key2));
Assert.assertThat(entryMap, CoreMatchers.notNullValue());
Assert.assertThat(entryMap.get(key1), HttpCacheEntryMatcher.equivalent(value1));
Assert.assertThat(entryMap.get(key2), CoreMatchers.nullValue());
verify(impl).digestToStorageKey(key1);
verify(impl).digestToStorageKey(key2);
verify(impl).bulkRestore(Arrays.asList(storageKey1, storageKey2));
}
}