mirror of https://github.com/apache/druid.git
Merge pull request #417 from metamx/fix-caching-bug
Fix caching bug and add CachingClusteredClient tests
This commit is contained in:
commit
1745aa744b
|
@ -128,7 +128,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||
|
||||
|
||||
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<String, String>();
|
||||
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
final String priority = query.getContextValue("priority", "0");
|
||||
contextBuilder.put("priority", priority);
|
||||
|
@ -176,32 +176,37 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
queryCacheKey = null;
|
||||
}
|
||||
|
||||
// Pull cached segments from cache and remove from set of segments to query
|
||||
if (useCache && queryCacheKey != null) {
|
||||
if (queryCacheKey != null) {
|
||||
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
|
||||
for (Pair<ServerSelector, SegmentDescriptor> e : segments) {
|
||||
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
|
||||
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
|
||||
final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
|
||||
segment.lhs.getSegment().getIdentifier(),
|
||||
segment.rhs,
|
||||
queryCacheKey
|
||||
);
|
||||
cacheKeys.put(segment, segmentCacheKey);
|
||||
}
|
||||
|
||||
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
|
||||
// Pull cached segments from cache and remove from set of segments to query
|
||||
final Map<Cache.NamedKey, byte[]> cachedValues;
|
||||
if (useCache) {
|
||||
cachedValues = cache.getBulk(cacheKeys.values());
|
||||
} else {
|
||||
cachedValues = ImmutableMap.of();
|
||||
}
|
||||
|
||||
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
|
||||
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
|
||||
Cache.NamedKey segmentCacheKey = entry.getValue();
|
||||
|
||||
final ServerSelector selector = segment.lhs;
|
||||
final SegmentDescriptor descriptor = segment.rhs;
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final Interval segmentQueryInterval = segment.rhs.getInterval();
|
||||
|
||||
final byte[] cachedValue = cachedValues.get(segmentCacheKey);
|
||||
|
||||
if (cachedValue != null) {
|
||||
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
|
||||
|
||||
// remove cached segment from set of segments to query
|
||||
segments.remove(segment);
|
||||
} else {
|
||||
final String segmentIdentifier = selector.getSegment().getIdentifier();
|
||||
cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue));
|
||||
} else if (populateCache) {
|
||||
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier();
|
||||
cachePopulatorMap.put(
|
||||
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
|
||||
new CachePopulator(cache, objectMapper, segmentCacheKey)
|
||||
|
@ -229,7 +234,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
}
|
||||
}
|
||||
|
||||
return new LazySequence<T>(
|
||||
return new LazySequence<>(
|
||||
new Supplier<Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
|
@ -265,7 +270,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
|
||||
for (Pair<DateTime, byte[]> cachedResultPair : cachedResults) {
|
||||
final byte[] cachedResult = cachedResultPair.rhs;
|
||||
Sequence<Object> cachedSequence = new BaseSequence<Object, Iterator<Object>>(
|
||||
Sequence<Object> cachedSequence = new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
|
||||
{
|
||||
@Override
|
||||
|
@ -331,9 +336,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
String segmentIdentifier = value.getSegmentId();
|
||||
final Iterable<T> segmentResults = value.getResults();
|
||||
|
||||
cachePopulatorMap.get(
|
||||
CachePopulator cachePopulator = cachePopulatorMap.get(
|
||||
String.format("%s_%s", segmentIdentifier, value.getInterval())
|
||||
).populate(Iterables.transform(segmentResults, prepareForCache));
|
||||
);
|
||||
if(cachePopulator != null) {
|
||||
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
|
||||
}
|
||||
|
||||
return Sequences.simple(
|
||||
Iterables.transform(
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RangeIterable implements Iterable<Integer>
|
||||
{
|
||||
private final int end;
|
||||
private final int start;
|
||||
private final int increment;
|
||||
|
||||
public RangeIterable(
|
||||
int end
|
||||
)
|
||||
{
|
||||
this(0, end);
|
||||
}
|
||||
|
||||
public RangeIterable(
|
||||
int start,
|
||||
int end
|
||||
)
|
||||
{
|
||||
this(start, end, 1);
|
||||
}
|
||||
|
||||
public RangeIterable(
|
||||
int start,
|
||||
int end,
|
||||
final int i
|
||||
)
|
||||
{
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
this.increment = i;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return new Iterator<Integer>()
|
||||
{
|
||||
private int curr = start;
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return curr < end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer next()
|
||||
{
|
||||
try {
|
||||
return curr;
|
||||
}
|
||||
finally {
|
||||
curr += increment;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue