mirror of https://github.com/apache/druid.git
Merge pull request #58 from metamx/cache-bulkget
modify cacheBroker interface to implement bulk-get
This commit is contained in:
commit
ad6f962000
|
@ -28,6 +28,7 @@ import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
|
@ -41,7 +42,6 @@ import com.metamx.druid.TimelineObjectHolder;
|
||||||
import com.metamx.druid.VersionedIntervalTimeline;
|
import com.metamx.druid.VersionedIntervalTimeline;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.client.cache.Cache;
|
import com.metamx.druid.client.cache.Cache;
|
||||||
import com.metamx.druid.client.cache.CacheBroker;
|
|
||||||
import com.metamx.druid.client.selector.ServerSelector;
|
import com.metamx.druid.client.selector.ServerSelector;
|
||||||
import com.metamx.druid.partition.PartitionChunk;
|
import com.metamx.druid.partition.PartitionChunk;
|
||||||
import com.metamx.druid.query.CacheStrategy;
|
import com.metamx.druid.query.CacheStrategy;
|
||||||
|
@ -64,6 +64,7 @@ import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -74,19 +75,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
|
|
||||||
private final QueryToolChestWarehouse warehouse;
|
private final QueryToolChestWarehouse warehouse;
|
||||||
private final ServerView serverView;
|
private final ServerView serverView;
|
||||||
private final CacheBroker cacheBroker;
|
private final Cache cache;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
public CachingClusteredClient(
|
public CachingClusteredClient(
|
||||||
QueryToolChestWarehouse warehouse,
|
QueryToolChestWarehouse warehouse,
|
||||||
ServerView serverView,
|
ServerView serverView,
|
||||||
CacheBroker cacheBroker,
|
Cache cache,
|
||||||
ObjectMapper objectMapper
|
ObjectMapper objectMapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehouse = warehouse;
|
this.warehouse = warehouse;
|
||||||
this.serverView = serverView;
|
this.serverView = serverView;
|
||||||
this.cacheBroker = cacheBroker;
|
this.cache = cache;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
|
|
||||||
serverView.registerSegmentCallback(
|
serverView.registerSegmentCallback(
|
||||||
|
@ -98,7 +99,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
@Override
|
@Override
|
||||||
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
|
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
|
||||||
{
|
{
|
||||||
CachingClusteredClient.this.cacheBroker.provideCache(segment.getIdentifier()).close();
|
CachingClusteredClient.this.cache.close(segment.getIdentifier());
|
||||||
return ServerView.CallbackAction.CONTINUE;
|
return ServerView.CallbackAction.CONTINUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -111,7 +112,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
|
||||||
final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query);
|
final CacheStrategy<T, Query<T>> strategy = toolChest.getCacheStrategy(query);
|
||||||
|
|
||||||
final Map<DruidServer, List<SegmentDescriptor>> segs = Maps.newTreeMap();
|
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
|
||||||
|
|
||||||
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
|
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
|
||||||
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
|
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
|
||||||
|
|
||||||
|
@ -131,10 +133,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
return Sequences.empty();
|
return Sequences.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] queryCacheKey = null;
|
// build set of segments to query
|
||||||
if (strategy != null) {
|
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
|
||||||
queryCacheKey = strategy.computeCacheKey(query);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Interval interval : rewrittenQuery.getIntervals()) {
|
for (Interval interval : rewrittenQuery.getIntervals()) {
|
||||||
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
|
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
|
||||||
|
@ -146,55 +146,67 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
|
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
|
||||||
);
|
);
|
||||||
|
|
||||||
if (queryCacheKey == null) {
|
segments.add(Pair.of(selector, descriptor));
|
||||||
final DruidServer server = selector.pick();
|
|
||||||
List<SegmentDescriptor> descriptors = segs.get(server);
|
|
||||||
|
|
||||||
if (descriptors == null) {
|
|
||||||
descriptors = Lists.newArrayList();
|
|
||||||
segs.put(server, descriptors);
|
|
||||||
}
|
|
||||||
|
|
||||||
descriptors.add(descriptor);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
final Interval segmentQueryInterval = holder.getInterval();
|
|
||||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
|
||||||
|
|
||||||
final byte[] cacheKey = ByteBuffer
|
|
||||||
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
|
||||||
.putLong(segmentQueryInterval.getStartMillis())
|
|
||||||
.putLong(segmentQueryInterval.getEndMillis())
|
|
||||||
.put(versionBytes)
|
|
||||||
.putInt(descriptor.getPartitionNumber())
|
|
||||||
.put(queryCacheKey)
|
|
||||||
.array();
|
|
||||||
final String segmentIdentifier = selector.getSegment().getIdentifier();
|
|
||||||
final Cache cache = cacheBroker.provideCache(segmentIdentifier);
|
|
||||||
final byte[] cachedValue = cache.get(cacheKey);
|
|
||||||
|
|
||||||
if (useCache && cachedValue != null) {
|
|
||||||
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
|
|
||||||
} else {
|
|
||||||
final DruidServer server = selector.pick();
|
|
||||||
List<SegmentDescriptor> descriptors = segs.get(server);
|
|
||||||
|
|
||||||
if (descriptors == null) {
|
|
||||||
descriptors = Lists.newArrayList();
|
|
||||||
segs.put(server, descriptors);
|
|
||||||
}
|
|
||||||
|
|
||||||
descriptors.add(descriptor);
|
|
||||||
cachePopulatorMap.put(
|
|
||||||
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
|
|
||||||
new CachePopulator(cache, objectMapper, cacheKey)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final byte[] queryCacheKey;
|
||||||
|
if(strategy != null) {
|
||||||
|
queryCacheKey = strategy.computeCacheKey(query);
|
||||||
|
} else {
|
||||||
|
queryCacheKey = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull cached segments from cache and remove from set of segments to query
|
||||||
|
if(useCache && queryCacheKey != null) {
|
||||||
|
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
|
||||||
|
for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
|
||||||
|
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
|
||||||
|
|
||||||
|
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
|
||||||
|
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
|
||||||
|
Cache.NamedKey segmentCacheKey = entry.getValue();
|
||||||
|
|
||||||
|
final ServerSelector selector = segment.lhs;
|
||||||
|
final SegmentDescriptor descriptor = segment.rhs;
|
||||||
|
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||||
|
|
||||||
|
final byte[] cachedValue = cachedValues.get(segmentCacheKey);
|
||||||
|
|
||||||
|
if (cachedValue != null) {
|
||||||
|
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
|
||||||
|
|
||||||
|
// remove cached segment from set of segments to query
|
||||||
|
segments.remove(segment);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final String segmentIdentifier = selector.getSegment().getIdentifier();
|
||||||
|
cachePopulatorMap.put(
|
||||||
|
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
|
||||||
|
new CachePopulator(cache, objectMapper, segmentCacheKey)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile list of all segments not pulled from cache
|
||||||
|
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
|
||||||
|
final DruidServer server = segment.lhs.pick();
|
||||||
|
List<SegmentDescriptor> descriptors = serverSegments.get(server);
|
||||||
|
|
||||||
|
if (descriptors == null) {
|
||||||
|
descriptors = Lists.newArrayList();
|
||||||
|
serverSegments.put(server, descriptors);
|
||||||
|
}
|
||||||
|
|
||||||
|
descriptors.add(segment.rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return new LazySequence<T>(
|
return new LazySequence<T>(
|
||||||
new Supplier<Sequence<T>>()
|
new Supplier<Sequence<T>>()
|
||||||
{
|
{
|
||||||
|
@ -264,7 +276,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
|
private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listOfSequences)
|
||||||
{
|
{
|
||||||
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : segs.entrySet()) {
|
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
|
||||||
final DruidServer server = entry.getKey();
|
final DruidServer server = entry.getKey();
|
||||||
final List<SegmentDescriptor> descriptors = entry.getValue();
|
final List<SegmentDescriptor> descriptors = entry.getValue();
|
||||||
|
|
||||||
|
@ -328,13 +340,29 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
|
||||||
|
{
|
||||||
|
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||||
|
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||||
|
|
||||||
|
return new Cache.NamedKey(
|
||||||
|
segmentIdentifier, ByteBuffer
|
||||||
|
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
||||||
|
.putLong(segmentQueryInterval.getStartMillis())
|
||||||
|
.putLong(segmentQueryInterval.getEndMillis())
|
||||||
|
.put(versionBytes)
|
||||||
|
.putInt(descriptor.getPartitionNumber())
|
||||||
|
.put(queryCacheKey).array()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private static class CachePopulator
|
private static class CachePopulator
|
||||||
{
|
{
|
||||||
private final Cache cache;
|
private final Cache cache;
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
private final byte[] key;
|
private final Cache.NamedKey key;
|
||||||
|
|
||||||
public CachePopulator(Cache cache, ObjectMapper mapper, byte[] key)
|
public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
|
||||||
{
|
{
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.mapper = mapper;
|
this.mapper = mapper;
|
||||||
|
|
|
@ -19,13 +19,63 @@
|
||||||
|
|
||||||
package com.metamx.druid.client.cache;
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An interface to limit the operations that can be done on a Cache so that it is easier to reason about what
|
|
||||||
* is actually going to be done.
|
|
||||||
*/
|
*/
|
||||||
public interface Cache
|
public interface Cache
|
||||||
{
|
{
|
||||||
public byte[] get(byte[] key);
|
public byte[] get(NamedKey key);
|
||||||
public void put(byte[] key, byte[] value);
|
public void put(NamedKey key, byte[] value);
|
||||||
public void close();
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
|
||||||
|
|
||||||
|
public void close(String namespace);
|
||||||
|
|
||||||
|
public CacheStats getStats();
|
||||||
|
|
||||||
|
public class NamedKey
|
||||||
|
{
|
||||||
|
final public String namespace;
|
||||||
|
final public byte[] key;
|
||||||
|
|
||||||
|
public NamedKey(String namespace, byte[] key) {
|
||||||
|
Preconditions.checkArgument(namespace != null, "namespace must not be null");
|
||||||
|
Preconditions.checkArgument(key != null, "key must not be null");
|
||||||
|
this.namespace = namespace;
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
NamedKey namedKey = (NamedKey) o;
|
||||||
|
|
||||||
|
if (!namespace.equals(namedKey.namespace)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!Arrays.equals(key, namedKey.key)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = namespace.hashCode();
|
||||||
|
result = 31 * result + Arrays.hashCode(key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.metamx.druid.client.cache;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public interface CacheBroker
|
|
||||||
{
|
|
||||||
public CacheStats getStats();
|
|
||||||
public Cache provideCache(String identifier);
|
|
||||||
}
|
|
|
@ -27,21 +27,21 @@ import com.metamx.metrics.AbstractMonitor;
|
||||||
*/
|
*/
|
||||||
public class CacheMonitor extends AbstractMonitor
|
public class CacheMonitor extends AbstractMonitor
|
||||||
{
|
{
|
||||||
private final CacheBroker cacheBroker;
|
private final Cache cache;
|
||||||
|
|
||||||
private volatile CacheStats prevCacheStats = null;
|
private volatile CacheStats prevCacheStats = null;
|
||||||
|
|
||||||
public CacheMonitor(
|
public CacheMonitor(
|
||||||
CacheBroker cacheBroker
|
Cache cache
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.cacheBroker = cacheBroker;
|
this.cache = cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean doMonitor(ServiceEmitter emitter)
|
public boolean doMonitor(ServiceEmitter emitter)
|
||||||
{
|
{
|
||||||
final CacheStats currCacheStats = cacheBroker.getStats();
|
final CacheStats currCacheStats = cache.getStats();
|
||||||
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
|
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
|
||||||
|
|
||||||
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class MapCache implements Cache
|
||||||
|
{
|
||||||
|
private final Map<ByteBuffer, byte[]> baseMap;
|
||||||
|
private final ByteCountingLRUMap byteCountingLRUMap;
|
||||||
|
|
||||||
|
private final Map<String, byte[]> namespaceId;
|
||||||
|
private final AtomicInteger ids;
|
||||||
|
|
||||||
|
private final Object clearLock = new Object();
|
||||||
|
|
||||||
|
private final AtomicLong hitCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong missCount = new AtomicLong(0);
|
||||||
|
|
||||||
|
public static com.metamx.druid.client.cache.Cache create(final MapCacheConfig config)
|
||||||
|
{
|
||||||
|
return new MapCache(
|
||||||
|
new ByteCountingLRUMap(
|
||||||
|
config.getInitialSize(),
|
||||||
|
config.getLogEvictionCount(),
|
||||||
|
config.getSizeInBytes()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
MapCache(
|
||||||
|
ByteCountingLRUMap byteCountingLRUMap
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.byteCountingLRUMap = byteCountingLRUMap;
|
||||||
|
|
||||||
|
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
||||||
|
|
||||||
|
namespaceId = Maps.newHashMap();
|
||||||
|
ids = new AtomicInteger();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheStats getStats()
|
||||||
|
{
|
||||||
|
return new CacheStats(
|
||||||
|
hitCount.get(),
|
||||||
|
missCount.get(),
|
||||||
|
byteCountingLRUMap.size(),
|
||||||
|
byteCountingLRUMap.getNumBytes(),
|
||||||
|
byteCountingLRUMap.getEvictionCount(),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get(NamedKey key)
|
||||||
|
{
|
||||||
|
final byte[] retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key));
|
||||||
|
if (retVal == null) {
|
||||||
|
missCount.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
hitCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(NamedKey key, byte[] value)
|
||||||
|
{
|
||||||
|
synchronized (clearLock) {
|
||||||
|
baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||||
|
{
|
||||||
|
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
|
||||||
|
for(NamedKey key : keys) {
|
||||||
|
retVal.put(key, get(key));
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(String namespace)
|
||||||
|
{
|
||||||
|
byte[] idBytes;
|
||||||
|
synchronized (namespaceId) {
|
||||||
|
idBytes = getNamespaceId(namespace);
|
||||||
|
if(idBytes == null) return;
|
||||||
|
|
||||||
|
namespaceId.remove(namespace);
|
||||||
|
}
|
||||||
|
synchronized (clearLock) {
|
||||||
|
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
ByteBuffer next = iter.next();
|
||||||
|
|
||||||
|
if (next.get(0) == idBytes[0]
|
||||||
|
&& next.get(1) == idBytes[1]
|
||||||
|
&& next.get(2) == idBytes[2]
|
||||||
|
&& next.get(3) == idBytes[3]) {
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getNamespaceId(final String identifier)
|
||||||
|
{
|
||||||
|
synchronized (namespaceId) {
|
||||||
|
byte[] idBytes = namespaceId.get(identifier);
|
||||||
|
if (idBytes != null) {
|
||||||
|
return idBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
idBytes = Ints.toByteArray(ids.getAndIncrement());
|
||||||
|
namespaceId.put(identifier, idBytes);
|
||||||
|
return idBytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer computeKey(byte[] idBytes, byte[] key)
|
||||||
|
{
|
||||||
|
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key);
|
||||||
|
retVal.rewind();
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,165 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.metamx.druid.client.cache;
|
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import com.google.common.primitives.Ints;
|
|
||||||
import com.metamx.common.ISE;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class MapCacheBroker implements CacheBroker
|
|
||||||
{
|
|
||||||
private final Map<ByteBuffer, byte[]> baseMap;
|
|
||||||
private final ByteCountingLRUMap byteCountingLRUMap;
|
|
||||||
|
|
||||||
private final Map<String, Cache> cacheCache;
|
|
||||||
private final AtomicInteger ids;
|
|
||||||
|
|
||||||
private final Object clearLock = new Object();
|
|
||||||
|
|
||||||
private final AtomicLong hitCount = new AtomicLong(0);
|
|
||||||
private final AtomicLong missCount = new AtomicLong(0);
|
|
||||||
|
|
||||||
public static CacheBroker create(final MapCacheBrokerConfig config)
|
|
||||||
{
|
|
||||||
return new MapCacheBroker(
|
|
||||||
new ByteCountingLRUMap(
|
|
||||||
config.getInitialSize(),
|
|
||||||
config.getLogEvictionCount(),
|
|
||||||
config.getSizeInBytes()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
MapCacheBroker(
|
|
||||||
ByteCountingLRUMap byteCountingLRUMap
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.byteCountingLRUMap = byteCountingLRUMap;
|
|
||||||
|
|
||||||
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
|
||||||
|
|
||||||
cacheCache = Maps.newHashMap();
|
|
||||||
ids = new AtomicInteger();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CacheStats getStats()
|
|
||||||
{
|
|
||||||
return new CacheStats(
|
|
||||||
hitCount.get(),
|
|
||||||
missCount.get(),
|
|
||||||
byteCountingLRUMap.size(),
|
|
||||||
byteCountingLRUMap.getNumBytes(),
|
|
||||||
byteCountingLRUMap.getEvictionCount(),
|
|
||||||
0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Cache provideCache(final String identifier)
|
|
||||||
{
|
|
||||||
synchronized (cacheCache) {
|
|
||||||
final Cache cachedCache = cacheCache.get(identifier);
|
|
||||||
if (cachedCache != null) {
|
|
||||||
return cachedCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
final byte[] myIdBytes = Ints.toByteArray(ids.getAndIncrement());
|
|
||||||
|
|
||||||
final Cache theCache = new Cache()
|
|
||||||
{
|
|
||||||
volatile boolean open = true;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] get(byte[] key)
|
|
||||||
{
|
|
||||||
if (open) {
|
|
||||||
final byte[] retVal = baseMap.get(computeKey(key));
|
|
||||||
if (retVal == null) {
|
|
||||||
missCount.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
hitCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
throw new ISE("Cache for identifier[%s] is closed.", identifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(byte[] key, byte[] value)
|
|
||||||
{
|
|
||||||
synchronized (clearLock) {
|
|
||||||
if (open) {
|
|
||||||
baseMap.put(computeKey(key), value);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new ISE("Cache for identifier[%s] is closed.", identifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close()
|
|
||||||
{
|
|
||||||
synchronized (cacheCache) {
|
|
||||||
cacheCache.remove(identifier);
|
|
||||||
}
|
|
||||||
synchronized (clearLock) {
|
|
||||||
if (open) {
|
|
||||||
open = false;
|
|
||||||
|
|
||||||
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
ByteBuffer next = iter.next();
|
|
||||||
|
|
||||||
if (next.get(0) == myIdBytes[0]
|
|
||||||
&& next.get(1) == myIdBytes[1]
|
|
||||||
&& next.get(2) == myIdBytes[2]
|
|
||||||
&& next.get(3) == myIdBytes[3]) {
|
|
||||||
iter.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ByteBuffer computeKey(byte[] key)
|
|
||||||
{
|
|
||||||
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(myIdBytes).put(key);
|
|
||||||
retVal.rewind();
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
cacheCache.put(identifier, theCache);
|
|
||||||
|
|
||||||
return theCache;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -24,7 +24,7 @@ import org.skife.config.Default;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public abstract class MapCacheBrokerConfig
|
public abstract class MapCacheConfig
|
||||||
{
|
{
|
||||||
@Config("${prefix}.sizeInBytes")
|
@Config("${prefix}.sizeInBytes")
|
||||||
@Default("0")
|
@Default("0")
|
|
@ -19,7 +19,9 @@
|
||||||
|
|
||||||
package com.metamx.druid.client.cache;
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import net.iharder.base64.Base64;
|
import net.iharder.base64.Base64;
|
||||||
import net.spy.memcached.AddrUtil;
|
import net.spy.memcached.AddrUtil;
|
||||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
|
@ -27,25 +29,28 @@ import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
import net.spy.memcached.FailureMode;
|
import net.spy.memcached.FailureMode;
|
||||||
import net.spy.memcached.MemcachedClient;
|
import net.spy.memcached.MemcachedClient;
|
||||||
import net.spy.memcached.MemcachedClientIF;
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
|
import net.spy.memcached.internal.BulkFuture;
|
||||||
import net.spy.memcached.transcoders.SerializingTranscoder;
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class MemcachedCacheBroker implements CacheBroker
|
public class MemcachedCache implements Cache
|
||||||
{
|
{
|
||||||
public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config)
|
public static MemcachedCache create(final MemcachedCacheConfig config)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
|
SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
|
||||||
// disable compression
|
// disable compression
|
||||||
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
||||||
|
|
||||||
return new MemcachedCacheBroker(
|
return new MemcachedCache(
|
||||||
new MemcachedClient(
|
new MemcachedClient(
|
||||||
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
||||||
|
@ -74,7 +79,7 @@ public class MemcachedCacheBroker implements CacheBroker
|
||||||
private final AtomicLong missCount = new AtomicLong(0);
|
private final AtomicLong missCount = new AtomicLong(0);
|
||||||
private final AtomicLong timeoutCount = new AtomicLong(0);
|
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||||
|
|
||||||
MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
|
MemcachedCache(MemcachedClientIF client, int timeout, int expiration) {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
this.expiration = expiration;
|
this.expiration = expiration;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
@ -94,52 +99,94 @@ public class MemcachedCacheBroker implements CacheBroker
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Cache provideCache(final String identifier)
|
public byte[] get(NamedKey key)
|
||||||
{
|
{
|
||||||
return new Cache()
|
Future<Object> future = client.asyncGet(computeKeyString(key));
|
||||||
{
|
try {
|
||||||
@Override
|
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
public byte[] get(byte[] key)
|
if(bytes != null) {
|
||||||
{
|
hitCount.incrementAndGet();
|
||||||
Future<Object> future = client.asyncGet(computeKey(identifier, key));
|
|
||||||
try {
|
|
||||||
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
|
||||||
if(bytes != null) {
|
|
||||||
hitCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
missCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
catch(TimeoutException e) {
|
|
||||||
timeoutCount.incrementAndGet();
|
|
||||||
future.cancel(false);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
catch(InterruptedException e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
catch(ExecutionException e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
@Override
|
missCount.incrementAndGet();
|
||||||
public void put(byte[] key, byte[] value)
|
|
||||||
{
|
|
||||||
client.set(computeKey(identifier, key), expiration, value);
|
|
||||||
}
|
}
|
||||||
|
return bytes;
|
||||||
@Override
|
}
|
||||||
public void close()
|
catch(TimeoutException e) {
|
||||||
{
|
timeoutCount.incrementAndGet();
|
||||||
// no resources to cleanup
|
future.cancel(false);
|
||||||
}
|
return null;
|
||||||
};
|
}
|
||||||
|
catch(InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
catch(ExecutionException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String computeKey(String identifier, byte[] key) {
|
@Override
|
||||||
return identifier + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES);
|
public void put(NamedKey key, byte[] value)
|
||||||
|
{
|
||||||
|
client.set(computeKeyString(key), expiration, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||||
|
{
|
||||||
|
Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
|
||||||
|
keys,
|
||||||
|
new Function<NamedKey, String>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String apply(
|
||||||
|
@Nullable NamedKey input
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return computeKeyString(input);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
BulkFuture<Map<String, Object>> future = client.asyncGetBulk(keyLookup.keySet());
|
||||||
|
|
||||||
|
try {
|
||||||
|
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
if(future.isTimeout()) {
|
||||||
|
future.cancel(false);
|
||||||
|
timeoutCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
missCount.addAndGet(keyLookup.size() - some.size());
|
||||||
|
hitCount.addAndGet(some.size());
|
||||||
|
|
||||||
|
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
||||||
|
for(Map.Entry<String, Object> entry : some.entrySet()) {
|
||||||
|
results.put(
|
||||||
|
keyLookup.get(entry.getKey()),
|
||||||
|
(byte[])entry.getValue()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
catch(InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
catch(ExecutionException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(String namespace)
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String computeKeyString(NamedKey key) {
|
||||||
|
return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -3,7 +3,7 @@ package com.metamx.druid.client.cache;
|
||||||
import org.skife.config.Config;
|
import org.skife.config.Config;
|
||||||
import org.skife.config.Default;
|
import org.skife.config.Default;
|
||||||
|
|
||||||
public abstract class MemcachedCacheBrokerConfig
|
public abstract class MemcachedCacheConfig
|
||||||
{
|
{
|
||||||
@Config("${prefix}.expiration")
|
@Config("${prefix}.expiration")
|
||||||
@Default("31536000")
|
@Default("31536000")
|
|
@ -34,13 +34,13 @@ import com.metamx.druid.client.BrokerServerView;
|
||||||
import com.metamx.druid.client.CachingClusteredClient;
|
import com.metamx.druid.client.CachingClusteredClient;
|
||||||
import com.metamx.druid.client.ClientConfig;
|
import com.metamx.druid.client.ClientConfig;
|
||||||
import com.metamx.druid.client.ClientInventoryManager;
|
import com.metamx.druid.client.ClientInventoryManager;
|
||||||
import com.metamx.druid.client.cache.CacheBroker;
|
import com.metamx.druid.client.cache.Cache;
|
||||||
import com.metamx.druid.client.cache.CacheConfig;
|
import com.metamx.druid.client.cache.CacheConfig;
|
||||||
import com.metamx.druid.client.cache.CacheMonitor;
|
import com.metamx.druid.client.cache.CacheMonitor;
|
||||||
import com.metamx.druid.client.cache.MapCacheBroker;
|
import com.metamx.druid.client.cache.MapCache;
|
||||||
import com.metamx.druid.client.cache.MapCacheBrokerConfig;
|
import com.metamx.druid.client.cache.MapCacheConfig;
|
||||||
import com.metamx.druid.client.cache.MemcachedCacheBroker;
|
import com.metamx.druid.client.cache.MemcachedCache;
|
||||||
import com.metamx.druid.client.cache.MemcachedCacheBrokerConfig;
|
import com.metamx.druid.client.cache.MemcachedCacheConfig;
|
||||||
import com.metamx.druid.initialization.Initialization;
|
import com.metamx.druid.initialization.Initialization;
|
||||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||||
|
@ -78,7 +78,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
|
|
||||||
private QueryToolChestWarehouse warehouse = null;
|
private QueryToolChestWarehouse warehouse = null;
|
||||||
private HttpClient brokerHttpClient = null;
|
private HttpClient brokerHttpClient = null;
|
||||||
private CacheBroker cacheBroker = null;
|
private Cache cache = null;
|
||||||
|
|
||||||
private boolean useDiscovery = true;
|
private boolean useDiscovery = true;
|
||||||
|
|
||||||
|
@ -122,15 +122,15 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CacheBroker getCacheBroker()
|
public Cache getCache()
|
||||||
{
|
{
|
||||||
initializeCacheBroker();
|
initializeCacheBroker();
|
||||||
return cacheBroker;
|
return cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BrokerNode setCacheBroker(CacheBroker cacheBroker)
|
public BrokerNode setCache(Cache cache)
|
||||||
{
|
{
|
||||||
checkFieldNotSetAndSet("cacheBroker", cacheBroker);
|
checkFieldNotSetAndSet("cache", cache);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
final Lifecycle lifecycle = getLifecycle();
|
final Lifecycle lifecycle = getLifecycle();
|
||||||
|
|
||||||
final List<Monitor> monitors = getMonitors();
|
final List<Monitor> monitors = getMonitors();
|
||||||
monitors.add(new CacheMonitor(cacheBroker));
|
monitors.add(new CacheMonitor(cache));
|
||||||
startMonitoring(monitors);
|
startMonitoring(monitors);
|
||||||
|
|
||||||
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
|
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
|
||||||
|
@ -194,7 +194,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
);
|
);
|
||||||
lifecycle.addManagedInstance(clientInventoryManager);
|
lifecycle.addManagedInstance(clientInventoryManager);
|
||||||
|
|
||||||
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper());
|
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
|
||||||
lifecycle.addManagedInstance(baseClient);
|
lifecycle.addManagedInstance(baseClient);
|
||||||
|
|
||||||
|
|
||||||
|
@ -239,25 +239,25 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
|
|
||||||
private void initializeCacheBroker()
|
private void initializeCacheBroker()
|
||||||
{
|
{
|
||||||
if (cacheBroker == null) {
|
if (cache == null) {
|
||||||
String cacheType = getConfigFactory()
|
String cacheType = getConfigFactory()
|
||||||
.build(CacheConfig.class)
|
.build(CacheConfig.class)
|
||||||
.getType();
|
.getType();
|
||||||
|
|
||||||
if (cacheType.equals(CACHE_TYPE_LOCAL)) {
|
if (cacheType.equals(CACHE_TYPE_LOCAL)) {
|
||||||
setCacheBroker(
|
setCache(
|
||||||
MapCacheBroker.create(
|
MapCache.create(
|
||||||
getConfigFactory().buildWithReplacements(
|
getConfigFactory().buildWithReplacements(
|
||||||
MapCacheBrokerConfig.class,
|
MapCacheConfig.class,
|
||||||
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
|
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
|
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
|
||||||
setCacheBroker(
|
setCache(
|
||||||
MemcachedCacheBroker.create(
|
MemcachedCache.create(
|
||||||
getConfigFactory().buildWithReplacements(
|
getConfigFactory().buildWithReplacements(
|
||||||
MemcachedCacheBrokerConfig.class,
|
MemcachedCacheConfig.class,
|
||||||
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
|
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -31,56 +31,53 @@ public class MapCacheBrokerTest
|
||||||
private static final byte[] HI = "hi".getBytes();
|
private static final byte[] HI = "hi".getBytes();
|
||||||
private static final byte[] HO = "ho".getBytes();
|
private static final byte[] HO = "ho".getBytes();
|
||||||
private ByteCountingLRUMap baseMap;
|
private ByteCountingLRUMap baseMap;
|
||||||
private MapCacheBroker broker;
|
private MapCache cache;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
{
|
{
|
||||||
baseMap = new ByteCountingLRUMap(1024 * 1024);
|
baseMap = new ByteCountingLRUMap(1024 * 1024);
|
||||||
broker = new MapCacheBroker(baseMap);
|
cache = new MapCache(baseMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanity() throws Exception
|
public void testSanity() throws Exception
|
||||||
{
|
{
|
||||||
Cache aCache = broker.provideCache("a");
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
|
||||||
Cache theCache = broker.provideCache("the");
|
|
||||||
|
|
||||||
Assert.assertNull(aCache.get(HI));
|
|
||||||
Assert.assertEquals(0, baseMap.size());
|
Assert.assertEquals(0, baseMap.size());
|
||||||
put(aCache, HI, 1);
|
put(cache, "a", HI, 1);
|
||||||
Assert.assertEquals(1, baseMap.size());
|
Assert.assertEquals(1, baseMap.size());
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertNull(theCache.get(HI));
|
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
|
||||||
|
|
||||||
put(theCache, HI, 2);
|
put(cache, "the", HI, 2);
|
||||||
Assert.assertEquals(2, baseMap.size());
|
Assert.assertEquals(2, baseMap.size());
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertEquals(2, get(theCache, HI));
|
Assert.assertEquals(2, get(cache, "the", HI));
|
||||||
|
|
||||||
put(theCache, HO, 10);
|
put(cache, "the", HO, 10);
|
||||||
Assert.assertEquals(3, baseMap.size());
|
Assert.assertEquals(3, baseMap.size());
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertNull(aCache.get(HO));
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
|
||||||
Assert.assertEquals(2, get(theCache, HI));
|
Assert.assertEquals(2, get(cache, "the", HI));
|
||||||
Assert.assertEquals(10, get(theCache, HO));
|
Assert.assertEquals(10, get(cache, "the", HO));
|
||||||
|
|
||||||
theCache.close();
|
cache.close("the");
|
||||||
Assert.assertEquals(1, baseMap.size());
|
Assert.assertEquals(1, baseMap.size());
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertNull(aCache.get(HO));
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
|
||||||
|
|
||||||
aCache.close();
|
cache.close("a");
|
||||||
Assert.assertEquals(0, baseMap.size());
|
Assert.assertEquals(0, baseMap.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(Cache cache, byte[] key, Integer value)
|
public void put(Cache cache, String namespace, byte[] key, Integer value)
|
||||||
{
|
{
|
||||||
cache.put(key, Ints.toByteArray(value));
|
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
public int get(Cache cache, byte[] key)
|
public int get(Cache cache, String namespace, byte[] key)
|
||||||
{
|
{
|
||||||
return Ints.fromByteArray(cache.get(key));
|
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package com.metamx.druid.client.cache;
|
||||||
import com.google.caliper.Param;
|
import com.google.caliper.Param;
|
||||||
import com.google.caliper.Runner;
|
import com.google.caliper.Runner;
|
||||||
import com.google.caliper.SimpleBenchmark;
|
import com.google.caliper.SimpleBenchmark;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import net.spy.memcached.AddrUtil;
|
import net.spy.memcached.AddrUtil;
|
||||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
import net.spy.memcached.DefaultHashAlgorithm;
|
import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
|
@ -11,17 +12,19 @@ import net.spy.memcached.MemcachedClient;
|
||||||
import net.spy.memcached.MemcachedClientIF;
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
import net.spy.memcached.transcoders.SerializingTranscoder;
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
{
|
{
|
||||||
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
||||||
|
public static final String NAMESPACE = "default";
|
||||||
|
|
||||||
private MemcachedCacheBroker broker;
|
private MemcachedCache cache;
|
||||||
private MemcachedClientIF client;
|
private MemcachedClientIF client;
|
||||||
|
|
||||||
private Cache cache;
|
|
||||||
private static byte[] randBytes;
|
private static byte[] randBytes;
|
||||||
|
|
||||||
@Param({"localhost:11211"}) String hosts;
|
@Param({"localhost:11211"}) String hosts;
|
||||||
|
@ -39,8 +42,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
// disable compression
|
// disable compression
|
||||||
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
||||||
|
|
||||||
System.out.println(String.format("Using memcached hosts [%s]", hosts));
|
|
||||||
|
|
||||||
client = new MemcachedClient(
|
client = new MemcachedClient(
|
||||||
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
||||||
|
@ -53,14 +54,12 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
AddrUtil.getAddresses(hosts)
|
AddrUtil.getAddresses(hosts)
|
||||||
);
|
);
|
||||||
|
|
||||||
broker = new MemcachedCacheBroker(
|
cache = new MemcachedCache(
|
||||||
client,
|
client,
|
||||||
500, // 500 milliseconds
|
30000, // 30 seconds
|
||||||
3600 // 1 hour
|
3600 // 1 hour
|
||||||
);
|
);
|
||||||
|
|
||||||
cache = broker.provideCache("default");
|
|
||||||
|
|
||||||
|
|
||||||
randBytes = new byte[objectSize * 1024];
|
randBytes = new byte[objectSize * 1024];
|
||||||
new Random(0).nextBytes(randBytes);
|
new Random(0).nextBytes(randBytes);
|
||||||
|
@ -69,15 +68,14 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
@Override
|
@Override
|
||||||
protected void tearDown() throws Exception
|
protected void tearDown() throws Exception
|
||||||
{
|
{
|
||||||
client.flush();
|
client.shutdown(1, TimeUnit.MINUTES);
|
||||||
client.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void timePutObjects(int reps) {
|
public void timePutObjects(int reps) {
|
||||||
for(int i = 0; i < reps; ++i) {
|
for(int i = 0; i < reps; ++i) {
|
||||||
for(int k = 0; k < objectCount; ++k) {
|
for(int k = 0; k < objectCount; ++k) {
|
||||||
String key = BASE_KEY + i;
|
String key = BASE_KEY + k;
|
||||||
cache.put(key.getBytes(), randBytes);
|
cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes);
|
||||||
}
|
}
|
||||||
// make sure the write queue is empty
|
// make sure the write queue is empty
|
||||||
client.waitForQueues(1, TimeUnit.HOURS);
|
client.waitForQueues(1, TimeUnit.HOURS);
|
||||||
|
@ -89,8 +87,25 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
long count = 0;
|
long count = 0;
|
||||||
for (int i = 0; i < reps; i++) {
|
for (int i = 0; i < reps; i++) {
|
||||||
for(int k = 0; k < objectCount; ++k) {
|
for(int k = 0; k < objectCount; ++k) {
|
||||||
String key = BASE_KEY + i;
|
String key = BASE_KEY + k;
|
||||||
bytes = cache.get(key.getBytes());
|
bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes()));
|
||||||
|
count += bytes.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long timeBulkGetObjects(int reps) {
|
||||||
|
long count = 0;
|
||||||
|
for (int i = 0; i < reps; i++) {
|
||||||
|
List<Cache.NamedKey> keys = Lists.newArrayList();
|
||||||
|
for(int k = 0; k < objectCount; ++k) {
|
||||||
|
String key = BASE_KEY + k;
|
||||||
|
keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes()));
|
||||||
|
}
|
||||||
|
Map<Cache.NamedKey, byte[]> results = cache.getBulk(keys);
|
||||||
|
for(Cache.NamedKey key : keys) {
|
||||||
|
byte[] bytes = results.get(key);
|
||||||
count += bytes.length;
|
count += bytes.length;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package com.metamx.druid.client.cache;
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import net.spy.memcached.CASResponse;
|
import net.spy.memcached.CASResponse;
|
||||||
import net.spy.memcached.CASValue;
|
import net.spy.memcached.CASValue;
|
||||||
|
@ -27,6 +29,7 @@ import net.spy.memcached.ConnectionObserver;
|
||||||
import net.spy.memcached.MemcachedClientIF;
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
import net.spy.memcached.NodeLocator;
|
import net.spy.memcached.NodeLocator;
|
||||||
import net.spy.memcached.internal.BulkFuture;
|
import net.spy.memcached.internal.BulkFuture;
|
||||||
|
import net.spy.memcached.ops.OperationStatus;
|
||||||
import net.spy.memcached.transcoders.SerializingTranscoder;
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
import net.spy.memcached.transcoders.Transcoder;
|
import net.spy.memcached.transcoders.Transcoder;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -51,51 +54,70 @@ public class MemcachedCacheBrokerTest
|
||||||
{
|
{
|
||||||
private static final byte[] HI = "hi".getBytes();
|
private static final byte[] HI = "hi".getBytes();
|
||||||
private static final byte[] HO = "ho".getBytes();
|
private static final byte[] HO = "ho".getBytes();
|
||||||
private MemcachedCacheBroker broker;
|
private MemcachedCache cache;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
{
|
{
|
||||||
MemcachedClientIF client = new MockMemcachedClient();
|
MemcachedClientIF client = new MockMemcachedClient();
|
||||||
broker = new MemcachedCacheBroker(client, 500, 3600);
|
cache = new MemcachedCache(client, 500, 3600);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanity() throws Exception
|
public void testSanity() throws Exception
|
||||||
{
|
{
|
||||||
Cache aCache = broker.provideCache("a");
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
|
||||||
Cache theCache = broker.provideCache("the");
|
put(cache, "a", HI, 1);
|
||||||
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
|
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
|
||||||
|
|
||||||
Assert.assertNull(aCache.get(HI));
|
put(cache, "the", HI, 2);
|
||||||
put(aCache, HI, 1);
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(2, get(cache, "the", HI));
|
||||||
Assert.assertNull(theCache.get(HI));
|
|
||||||
|
|
||||||
put(theCache, HI, 2);
|
put(cache, "the", HO, 10);
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertEquals(2, get(theCache, HI));
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
|
||||||
|
Assert.assertEquals(2, get(cache, "the", HI));
|
||||||
|
Assert.assertEquals(10, get(cache, "the", HO));
|
||||||
|
|
||||||
put(theCache, HO, 10);
|
cache.close("the");
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
Assert.assertEquals(1, get(cache, "a", HI));
|
||||||
Assert.assertNull(aCache.get(HO));
|
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
|
||||||
Assert.assertEquals(2, get(theCache, HI));
|
|
||||||
Assert.assertEquals(10, get(theCache, HO));
|
|
||||||
|
|
||||||
theCache.close();
|
cache.close("a");
|
||||||
Assert.assertEquals(1, get(aCache, HI));
|
|
||||||
Assert.assertNull(aCache.get(HO));
|
|
||||||
|
|
||||||
aCache.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(Cache cache, byte[] key, Integer value)
|
@Test
|
||||||
|
public void testGetBulk() throws Exception
|
||||||
{
|
{
|
||||||
cache.put(key, Ints.toByteArray(value));
|
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
|
||||||
|
|
||||||
|
put(cache, "the", HI, 2);
|
||||||
|
put(cache, "the", HO, 10);
|
||||||
|
|
||||||
|
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
|
||||||
|
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
|
||||||
|
|
||||||
|
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
|
||||||
|
Lists.newArrayList(
|
||||||
|
key1,
|
||||||
|
key2
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(2, Ints.fromByteArray(result.get(key1)));
|
||||||
|
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public int get(Cache cache, byte[] key)
|
public void put(Cache cache, String namespace, byte[] key, Integer value)
|
||||||
{
|
{
|
||||||
return Ints.fromByteArray(cache.get(key));
|
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int get(Cache cache, String namespace, byte[] key)
|
||||||
|
{
|
||||||
|
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,9 +387,67 @@ class MockMemcachedClient implements MemcachedClientIF
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Transcoder<T> tc)
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(final Iterator<String> keys, final Transcoder<T> tc)
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException("not implemented");
|
return new BulkFuture<Map<String, T>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean isTimeout()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, T> getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
|
||||||
|
{
|
||||||
|
return get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OperationStatus getStatus()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean cancel(boolean b)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCancelled()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, T> get() throws InterruptedException, ExecutionException
|
||||||
|
{
|
||||||
|
Map<String, T> retVal = Maps.newHashMap();
|
||||||
|
|
||||||
|
while(keys.hasNext()) {
|
||||||
|
String key = keys.next();
|
||||||
|
CachedData data = theMap.get(key);
|
||||||
|
retVal.put(key, data != null ? tc.decode(data) : null);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, T> get(long l, TimeUnit timeUnit)
|
||||||
|
throws InterruptedException, ExecutionException, TimeoutException
|
||||||
|
{
|
||||||
|
return get();
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -383,9 +463,9 @@ class MockMemcachedClient implements MemcachedClientIF
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys)
|
public BulkFuture<Map<String, Object>> asyncGetBulk(final Collection<String> keys)
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException("not implemented");
|
return asyncGetBulk(keys.iterator(), transcoder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue