Refactor CachingClusteredClient.run() (#4489)

* Refactor CachingClusteredClient

* Comments

* Refactoring

* Readability fixes
This commit is contained in:
Roman Leventov 2017-07-23 17:10:36 +03:00 committed by Jihoon Son
parent b154ff0d4a
commit 7408a7c4ed
22 changed files with 496 additions and 457 deletions

View File

@ -22,6 +22,8 @@ package io.druid.timeline;
import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List;
public interface TimelineLookup<VersionType, ObjectType> public interface TimelineLookup<VersionType, ObjectType>
{ {
@ -35,7 +37,7 @@ public interface TimelineLookup<VersionType, ObjectType>
* @return Holders representing the interval that the objects exist for, PartitionHolders * @return Holders representing the interval that the objects exist for, PartitionHolders
* are guaranteed to be complete. Holders returned sorted by the interval. * are guaranteed to be complete. Holders returned sorted by the interval.
*/ */
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval); public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
/** /**
* Does a lookup for the objects representing the given time interval. Will also return * Does a lookup for the objects representing the given time interval. Will also return
@ -46,7 +48,7 @@ public interface TimelineLookup<VersionType, ObjectType>
* @return Holders representing the interval that the objects exist for, PartitionHolders * @return Holders representing the interval that the objects exist for, PartitionHolders
* can be incomplete. Holders returned sorted by the interval. * can be incomplete. Holders returned sorted by the interval.
*/ */
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval); public List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version); public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);

View File

@ -213,7 +213,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
} }
@Override @Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval) public List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval)
{ {
try { try {
lock.readLock().lock(); lock.readLock().lock();

View File

@ -19,18 +19,18 @@
package io.druid.java.util.common.guava; package io.druid.java.util.common.guava;
import com.google.common.base.Function; import java.util.function.Function;
/** /**
*/ */
public class MappedSequence<T, Out> implements Sequence<Out> public class MappedSequence<T, Out> implements Sequence<Out>
{ {
private final Sequence<T> baseSequence; private final Sequence<T> baseSequence;
private final Function<T, Out> fn; private final Function<? super T, ? extends Out> fn;
public MappedSequence( public MappedSequence(
Sequence<T> baseSequence, Sequence<T> baseSequence,
Function<T, Out> fn Function<? super T, ? extends Out> fn
) )
{ {
this.baseSequence = baseSequence; this.baseSequence = baseSequence;

View File

@ -19,16 +19,16 @@
package io.druid.java.util.common.guava; package io.druid.java.util.common.guava;
import com.google.common.base.Function; import java.util.function.Function;
/** /**
*/ */
public class MappingAccumulator<OutType, InType, MappedType> implements Accumulator<OutType, InType> public class MappingAccumulator<OutType, InType, MappedType> implements Accumulator<OutType, InType>
{ {
private final Function<InType, MappedType> fn; private final Function<? super InType, ? extends MappedType> fn;
private final Accumulator<OutType, MappedType> accumulator; private final Accumulator<OutType, MappedType> accumulator;
public MappingAccumulator(Function<InType, MappedType> fn, Accumulator<OutType, MappedType> accumulator) MappingAccumulator(Function<? super InType, ? extends MappedType> fn, Accumulator<OutType, MappedType> accumulator)
{ {
this.fn = fn; this.fn = fn;
this.accumulator = accumulator; this.accumulator = accumulator;

View File

@ -19,17 +19,17 @@
package io.druid.java.util.common.guava; package io.druid.java.util.common.guava;
import com.google.common.base.Function; import java.util.function.Function;
/** /**
*/ */
public class MappingYieldingAccumulator<OutType, InType, MappedType> extends YieldingAccumulator<OutType, InType> public class MappingYieldingAccumulator<OutType, InType, MappedType> extends YieldingAccumulator<OutType, InType>
{ {
private final Function<InType, MappedType> fn; private final Function<? super InType, ? extends MappedType> fn;
private final YieldingAccumulator<OutType, MappedType> baseAccumulator; private final YieldingAccumulator<OutType, MappedType> baseAccumulator;
public MappingYieldingAccumulator( public MappingYieldingAccumulator(
Function<InType, MappedType> fn, Function<? super InType, ? extends MappedType> fn,
YieldingAccumulator<OutType, MappedType> baseAccumulator YieldingAccumulator<OutType, MappedType> baseAccumulator
) )
{ {

View File

@ -31,16 +31,16 @@ import java.util.PriorityQueue;
*/ */
public class MergeSequence<T> extends YieldingSequenceBase<T> public class MergeSequence<T> extends YieldingSequenceBase<T>
{ {
private final Ordering<T> ordering; private final Ordering<? super T> ordering;
private final Sequence<Sequence<T>> baseSequences; private final Sequence<? extends Sequence<T>> baseSequences;
public MergeSequence( public MergeSequence(
Ordering<T> ordering, Ordering<? super T> ordering,
Sequence<Sequence<T>> baseSequences Sequence<? extends Sequence<? extends T>> baseSequences
) )
{ {
this.ordering = ordering; this.ordering = ordering;
this.baseSequences = baseSequences; this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences;
} }
@Override @Override
@ -62,37 +62,32 @@ public class MergeSequence<T> extends YieldingSequenceBase<T>
pQueue = baseSequences.accumulate( pQueue = baseSequences.accumulate(
pQueue, pQueue,
new Accumulator<PriorityQueue<Yielder<T>>, Sequence<T>>() (queue, in) -> {
{ final Yielder<T> yielder = in.toYielder(
@Override null,
public PriorityQueue<Yielder<T>> accumulate(PriorityQueue<Yielder<T>> queue, Sequence<T> in) new YieldingAccumulator<T, T>()
{ {
final Yielder<T> yielder = in.toYielder( @Override
null, public T accumulate(T accumulated, T in)
new YieldingAccumulator<T, T>()
{ {
@Override yield();
public T accumulate(T accumulated, T in) return in;
{
yield();
return in;
}
} }
); }
);
if (!yielder.isDone()) { if (!yielder.isDone()) {
queue.add(yielder); queue.add(yielder);
} else { } else {
try { try {
yielder.close(); yielder.close();
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw new RuntimeException(e);
}
} }
return queue;
} }
return queue;
} }
); );

View File

@ -19,6 +19,11 @@
package io.druid.java.util.common.guava; package io.druid.java.util.common.guava;
import com.google.common.collect.Ordering;
import java.util.concurrent.Executor;
import java.util.function.Function;
/** /**
* A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose * A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines * a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines
@ -57,4 +62,22 @@ public interface Sequence<T>
* @see Yielder * @see Yielder
*/ */
<OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator); <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
default <U> Sequence<U> map(Function<? super T, ? extends U> mapper)
{
return new MappedSequence<>(this, mapper);
}
default <R> Sequence<R> flatMerge(
Function<? super T, ? extends Sequence<? extends R>> mapper,
Ordering<? super R> ordering
)
{
return new MergeSequence<>(ordering, this.map(mapper));
}
default Sequence<T> withEffect(Runnable effect, Executor effectExecutor)
{
return Sequences.withEffect(this, effect, effectExecutor);
}
} }

View File

@ -80,9 +80,9 @@ public class Sequences
return new ConcatSequence<>(sequences); return new ConcatSequence<>(sequences);
} }
public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<From, To> fn) public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<? super From, ? extends To> fn)
{ {
return new MappedSequence<>(sequence, fn); return new MappedSequence<>(sequence, fn::apply);
} }
public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred) public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred)

View File

@ -19,12 +19,12 @@
package io.druid.java.util.common.guava; package io.druid.java.util.common.guava;
import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
import java.util.function.Function;
/** /**
*/ */
@ -51,7 +51,7 @@ public class MappedSequenceTest
SequenceTestHelper.testAll( SequenceTestHelper.testAll(
StringUtils.format("Run %,d: ", i), StringUtils.format("Run %,d: ", i),
new MappedSequence<>(Sequences.simple(vals), fn), new MappedSequence<>(Sequences.simple(vals), fn),
Lists.transform(vals, fn) Lists.transform(vals, fn::apply)
); );
} }
} }

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List; import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
/** /**
*/ */
@ -64,6 +66,12 @@ public class BySegmentResultValueClass<T> implements BySegmentResultValue<T>
return interval; return interval;
} }
public <U> BySegmentResultValueClass<U> mapResults(Function<? super T, ? extends U> mapper)
{
List<U> mappedResults = results.stream().map(mapper).collect(Collectors.toList());
return new BySegmentResultValueClass<>(mappedResults, segmentId, interval);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -117,7 +117,7 @@ public final class QueryPlus<T>
/** /**
* Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}. * Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}.
*/ */
public QueryPlus<T> withQuery(Query<T> replacementQuery) public <U> QueryPlus<U> withQuery(Query<U> replacementQuery)
{ {
return new QueryPlus<>(replacementQuery, queryMetrics); return new QueryPlus<>(replacementQuery, queryMetrics);
} }

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment; import io.druid.timeline.LogicalSegment;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
/** /**
@ -122,6 +123,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* *
* @return A CacheStrategy that can be used to populate and read from the Cache * @return A CacheStrategy that can be used to populate and read from the Cache
*/ */
@Nullable
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query) public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
{ {
return null; return null;

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.function.Function;
/** /**
*/ */
public class Result<T> implements Comparable<Result<T>> public class Result<T> implements Comparable<Result<T>>
@ -42,6 +44,11 @@ public class Result<T> implements Comparable<Result<T>>
this.value = value; this.value = value;
} }
public <U> Result<U> map(Function<? super T, ? extends U> mapper)
{
return new Result<>(timestamp, mapper.apply(value));
}
@Override @Override
public int compareTo(Result<T> tResult) public int compareTo(Result<T> tResult)
{ {

View File

@ -124,12 +124,9 @@ public class DimFilterUtils
if (dimFilter != null && shard != null) { if (dimFilter != null && shard != null) {
Map<String, Range<String>> domain = shard.getDomain(); Map<String, Range<String>> domain = shard.getDomain();
for (Map.Entry<String, Range<String>> entry : domain.entrySet()) { for (Map.Entry<String, Range<String>> entry : domain.entrySet()) {
Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache.get(entry.getKey()); String dimension = entry.getKey();
if (optFilterRangeSet == null) { Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
RangeSet<String> filterRangeSet = dimFilter.getDimensionRangeSet(entry.getKey()); .computeIfAbsent(dimension, d-> Optional.fromNullable(dimFilter.getDimensionRangeSet(d)));
optFilterRangeSet = Optional.fromNullable(filterRangeSet);
dimensionRangeCache.put(entry.getKey(), optFilterRangeSet);
}
if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) { if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) {
include = false; include = false;
} }

View File

@ -210,7 +210,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
makePreComputeManipulatorFn( makePreComputeManipulatorFn(
subquery, subquery,
MetricManipulatorFns.finalizing() MetricManipulatorFns.finalizing()
) )::apply
); );
} else { } else {
finalizingResults = subqueryResult; finalizingResults = subqueryResult;

View File

@ -119,7 +119,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
makeOrdering(updatedQuery), makeOrdering(updatedQuery),
createMergeFn(updatedQuery) createMergeFn(updatedQuery)
), ),
MERGE_TRANSFORM_FN MERGE_TRANSFORM_FN::apply
); );
} }

View File

@ -44,6 +44,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import javax.annotation.Nullable;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -291,6 +292,7 @@ public class BrokerServerView implements TimelineServerView
} }
@Nullable
@Override @Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource) public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{ {

View File

@ -21,12 +21,9 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -55,7 +52,6 @@ import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
@ -80,23 +76,22 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder; import io.druid.timeline.partition.PartitionHolder;
import io.druid.timeline.partition.ShardSpec;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.SortedMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/** /**
*/ */
@ -149,30 +144,33 @@ public class CachingClusteredClient implements QuerySegmentWalker
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals( public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
final Query<T> query,
final Iterable<Interval> intervals
)
{ {
return new QueryRunner<T>() return new QueryRunner<T>()
{ {
@Override @Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext) public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{ {
return CachingClusteredClient.this.run( return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline);
queryPlus,
responseContext,
timeline -> timeline
);
} }
}; };
} }
@Override /**
public <T> QueryRunner<T> getQueryRunnerForSegments( * Run a query. The timelineConverter will be given the "master" timeline and can be used to return a different
final Query<T> query, * timeline, if desired. This is used by getQueryRunnerForSegments.
final Iterable<SegmentDescriptor> specs */
private <T> Sequence<T> run(
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext,
final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter
) )
{
return new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{ {
return new QueryRunner<T>() return new QueryRunner<T>()
{ {
@ -183,10 +181,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
queryPlus, queryPlus,
responseContext, responseContext,
timeline -> { timeline -> {
final List<TimelineObjectHolder<String, ServerSelector>> retVal = new LinkedList<>(); final VersionedIntervalTimeline<String, ServerSelector> timeline2 =
final VersionedIntervalTimeline<String, ServerSelector> timeline2 = new VersionedIntervalTimeline<>( new VersionedIntervalTimeline<>(Ordering.natural());
Ordering.natural()
);
for (SegmentDescriptor spec : specs) { for (SegmentDescriptor spec : specs) {
final PartitionHolder<ServerSelector> entry = timeline.findEntry(spec.getInterval(), spec.getVersion()); final PartitionHolder<ServerSelector> entry = timeline.findEntry(spec.getInterval(), spec.getVersion());
if (entry != null) { if (entry != null) {
@ -204,60 +200,124 @@ public class CachingClusteredClient implements QuerySegmentWalker
} }
/** /**
* Run a query. The timelineFunction will be given the "master" timeline and can be used to return a different * This class essentially incapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* timeline, if desired. This is used by getQueryRunnerForSegments. * methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object
* being run, but {@link QuerySegmentWalker} API is designed so that implementations should be able to accept
* arbitrary queries.
*/ */
private <T> Sequence<T> run( private class SpecificQueryRunnable<T>
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext,
final Function<TimelineLookup<String, ServerSelector>, TimelineLookup<String, ServerSelector>> timelineFunction
)
{ {
final Query<T> query = queryPlus.getQuery(); private final QueryPlus<T> queryPlus;
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); private final Map<String, Object> responseContext;
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query); private final Query<T> query;
private final QueryToolChest<T, Query<T>> toolChest;
@Nullable
private final CacheStrategy<T, Object, Query<T>> strategy;
private final boolean useCache;
private final boolean populateCache;
private final boolean isBySegment;
private final int uncoveredIntervalsLimit;
private final Query<T> downstreamQuery;
private final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final Map<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap(); SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
this.queryPlus = queryPlus;
this.responseContext = responseContext;
this.query = queryPlus.getQuery();
this.toolChest = warehouse.getToolChest(query);
this.strategy = toolChest.getCacheStrategy(query);
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList(); this.useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap(); this.populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
this.isBySegment = QueryContexts.isBySegment(query);
final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); // Note that enabling this leads to putting uncovered intervals information in the response headers
final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); // and might blow up in some cases https://github.com/druid-io/druid/issues/2108
final boolean isBySegment = QueryContexts.isBySegment(query); this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query);
this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext());
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final int priority = QueryContexts.getPriority(query);
contextBuilder.put(QueryContexts.PRIORITY_KEY, priority);
if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true);
} }
TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource()); private ImmutableMap<String, Object> makeDownstreamQueryContext()
{
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
if (timeline == null) { final int priority = QueryContexts.getPriority(query);
return Sequences.empty(); contextBuilder.put(QueryContexts.PRIORITY_KEY, priority);
if (populateCache) {
// prevent down-stream nodes from caching results as well if we are populating the cache
contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true);
}
return contextBuilder.build();
} }
// build set of segments to query Sequence<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter)
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet(); {
@Nullable TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {
return Sequences.empty();
}
timeline = timelineConverter.apply(timeline);
if (uncoveredIntervalsLimit > 0) {
computeUncoveredIntervals(timeline);
}
timeline = timelineFunction.apply(timeline); final Set<ServerToSegment> segments = computeSegmentsToQuery(timeline);
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = CachingClusteredClient.lookupIntervalsInTimeline( @Nullable final byte[] queryCacheKey = computeQueryCacheKey();
timeline, if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
query.getIntervals() @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
); @Nullable final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
if (currentEtag != null && currentEtag.equals(prevEtag)) {
return Sequences.empty();
}
}
// Note that enabling this leads to putting uncovered intervals information in the response headers final List<Pair<Interval, byte[]>> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments);
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108 final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segments);
int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); return new LazySequence<>(() -> {
List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
addSequencesFromServer(sequencesByInterval, segmentsByServer);
return Sequences
.simple(sequencesByInterval)
.flatMerge(seq -> seq, query.getResultOrdering());
});
}
if (uncoveredIntervalsLimit > 0) { private Set<ServerToSegment> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)
List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); {
final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments(
query,
query.getIntervals().stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList())
);
final Set<ServerToSegment> segments = Sets.newLinkedHashSet();
final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = Maps.newHashMap();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
final Set<PartitionChunk<ServerSelector>> filteredChunks = DimFilterUtils.filterShards(
query.getFilter(),
holder.getObject(),
partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(),
dimensionRangeCache
);
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
);
segments.add(new ServerToSegment(server, segment));
}
}
return segments;
}
private void computeUncoveredIntervals(TimelineLookup<String, ServerSelector> timeline)
{
final List<Interval> uncoveredIntervals = new ArrayList<>(uncoveredIntervalsLimit);
boolean uncoveredIntervalsOverflowed = false; boolean uncoveredIntervalsOverflowed = false;
for (Interval interval : query.getIntervals()) { for (Interval interval : query.getIntervals()) {
@ -296,391 +356,327 @@ public class CachingClusteredClient implements QuerySegmentWalker
} }
} }
// Let tool chest filter out unneeded segments @Nullable
final List<TimelineObjectHolder<String, ServerSelector>> filteredServersLookup = private byte[] computeQueryCacheKey()
toolChest.filterSegments(query, serversLookup); {
Map<String, Optional<RangeSet<String>>> dimensionRangeCache = Maps.newHashMap(); if ((populateCache || useCache) // implies strategy != null
&& !isBySegment) { // explicit bySegment queries are never cached
// Filter unneeded chunks based on partition dimension assert strategy != null;
for (TimelineObjectHolder<String, ServerSelector> holder : filteredServersLookup) { return strategy.computeCacheKey(query);
final Set<PartitionChunk<ServerSelector>> filteredChunks = DimFilterUtils.filterShards( } else {
query.getFilter(), return null;
holder.getObject(),
new Function<PartitionChunk<ServerSelector>, ShardSpec>()
{
@Override
public ShardSpec apply(PartitionChunk<ServerSelector> input)
{
return input.getObject().getSegment().getShardSpec();
}
},
dimensionRangeCache
);
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
segments.add(Pair.of(selector, descriptor));
} }
} }
final byte[] queryCacheKey; @Nullable
private String computeCurrentEtag(final Set<ServerToSegment> segments, @Nullable byte[] queryCacheKey)
if ((populateCache || useCache) // implies strategy != null {
&& !isBySegment) /* explicit bySegment queries are never cached */ {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
if (query.getContext().get(QueryResource.HDR_IF_NONE_MATCH) != null) {
String prevEtag = (String) query.getContext().get(QueryResource.HDR_IF_NONE_MATCH);
//compute current Etag
Hasher hasher = Hashing.sha1().newHasher(); Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true; boolean hasOnlyHistoricalSegments = true;
for (Pair<ServerSelector, SegmentDescriptor> p : segments) { for (ServerToSegment p : segments) {
if (!p.lhs.pick().getServer().segmentReplicatable()) { if (!p.getServer().pick().getServer().segmentReplicatable()) {
hasOnlyHistoricalSegments = false; hasOnlyHistoricalSegments = false;
break; break;
} }
hasher.putString(p.lhs.getSegment().getIdentifier(), Charsets.UTF_8); hasher.putString(p.getServer().getSegment().getIdentifier(), Charsets.UTF_8);
} }
if (hasOnlyHistoricalSegments) { if (hasOnlyHistoricalSegments) {
hasher.putBytes(queryCacheKey == null ? strategy.computeCacheKey(query) : queryCacheKey); hasher.putBytes(queryCacheKey == null ? strategy.computeCacheKey(query) : queryCacheKey);
String currEtag = Base64.encodeBase64String(hasher.hash().asBytes()); String currEtag = Base64.encodeBase64String(hasher.hash().asBytes());
responseContext.put(QueryResource.HDR_ETAG, currEtag); responseContext.put(QueryResource.HEADER_ETAG, currEtag);
if (prevEtag.equals(currEtag)) { return currEtag;
return Sequences.empty(); } else {
} return null;
} }
} }
if (queryCacheKey != null) { private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults(
// cachKeys map must preserve segment ordering, in order for shards to always be combined in the same order final byte[] queryCacheKey,
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap(); final Set<ServerToSegment> segments
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) { )
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( {
segment.lhs.getSegment().getIdentifier(), if (queryCacheKey == null) {
segment.rhs, return Collections.emptyList();
queryCacheKey
);
cacheKeys.put(segment, segmentCacheKey);
} }
final List<Pair<Interval, byte[]>> alreadyCachedResults = Lists.newArrayList();
Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey);
// Pull cached segments from cache and remove from set of segments to query // Pull cached segments from cache and remove from set of segments to query
final Map<Cache.NamedKey, byte[]> cachedValues; final Map<Cache.NamedKey, byte[]> cachedValues = computeCachedValues(perSegmentCacheKeys);
if (useCache) {
cachedValues = cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit()));
} else {
cachedValues = ImmutableMap.of();
}
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) { perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey(); final Interval segmentQueryInterval = segment.getSegmentDescriptor().getInterval();
Cache.NamedKey segmentCacheKey = entry.getValue();
final Interval segmentQueryInterval = segment.rhs.getInterval();
final byte[] cachedValue = cachedValues.get(segmentCacheKey); final byte[] cachedValue = cachedValues.get(segmentCacheKey);
if (cachedValue != null) { if (cachedValue != null) {
// remove cached segment from set of segments to query // remove cached segment from set of segments to query
segments.remove(segment); segments.remove(segment);
cachedResults.add(Pair.of(segmentQueryInterval, cachedValue)); alreadyCachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
} else if (populateCache) { } else if (populateCache) {
// otherwise, if populating cache, add segment to list of segments to cache // otherwise, if populating cache, add segment to list of segments to cache
final String segmentIdentifier = segment.lhs.getSegment().getIdentifier(); final String segmentIdentifier = segment.getServer().getSegment().getIdentifier();
cachePopulatorMap.put( addCachePopulator(segmentCacheKey, segmentIdentifier, segmentQueryInterval);
StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey)
);
} }
} });
return alreadyCachedResults;
} }
// Compile list of all segments not pulled from cache private Map<ServerToSegment, Cache.NamedKey> computePerSegmentCacheKeys(
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) { Set<ServerToSegment> segments,
final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); byte[] queryCacheKey
)
{
// cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order
Map<ServerToSegment, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
for (ServerToSegment serverToSegment : segments) {
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
serverToSegment.getServer().getSegment().getIdentifier(),
serverToSegment.getSegmentDescriptor(),
queryCacheKey
);
cacheKeys.put(serverToSegment, segmentCacheKey);
}
return cacheKeys;
}
if (queryableDruidServer == null) { private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<ServerToSegment, Cache.NamedKey> cacheKeys)
log.makeAlert( {
"No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", if (useCache) {
segment.rhs, return cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit()));
query.getDataSource()
).emit();
} else { } else {
final DruidServer server = queryableDruidServer.getServer(); return ImmutableMap.of();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
descriptors.add(segment.rhs);
} }
} }
return new LazySequence<>( private void addCachePopulator(
new Supplier<Sequence<T>>() Cache.NamedKey segmentCacheKey,
{ String segmentIdentifier,
@Override Interval segmentQueryInterval
public Sequence<T> get() )
{ {
ArrayList<Sequence<T>> sequencesByInterval = Lists.newArrayList(); cachePopulatorMap.put(
addSequencesFromCache(sequencesByInterval); StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
addSequencesFromServer(sequencesByInterval); new CachePopulator(cache, objectMapper, segmentCacheKey)
);
}
return mergeCachedAndUncachedSequences(query, sequencesByInterval); @Nullable
} private CachePopulator getCachePopulator(String segmentId, Interval segmentInterval)
{
return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
}
private void addSequencesFromCache(ArrayList<Sequence<T>> listOfSequences) private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<ServerToSegment> segments)
{ {
if (strategy == null) { final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = Maps.newTreeMap();
return; for (ServerToSegment serverToSegment : segments) {
} final QueryableDruidServer queryableDruidServer = serverToSegment.getServer().pick();
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(); if (queryableDruidServer == null) {
final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz(); log.makeAlert(
for (Pair<Interval, byte[]> cachedResultPair : cachedResults) { "No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!",
final byte[] cachedResult = cachedResultPair.rhs; serverToSegment.getSegmentDescriptor(),
Sequence<Object> cachedSequence = new BaseSequence<>( query.getDataSource()
new BaseSequence.IteratorMaker<Object, Iterator<Object>>() ).emit();
{ } else {
@Override final DruidServer server = queryableDruidServer.getServer();
public Iterator<Object> make() serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(serverToSegment.getSegmentDescriptor());
{ }
try { }
if (cachedResult.length == 0) { return serverSegments;
return Iterators.emptyIterator(); }
}
return objectMapper.readValues( private void addSequencesFromCache(
objectMapper.getFactory().createParser(cachedResult), final List<Sequence<T>> listOfSequences,
cacheObjectClazz final List<Pair<Interval, byte[]>> cachedResults
); )
} {
catch (IOException e) { if (strategy == null) {
throw Throwables.propagate(e); return;
} }
}
@Override final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache();
public void cleanup(Iterator<Object> iterFromMake) final TypeReference<Object> cacheObjectClazz = strategy.getCacheObjectClazz();
{ for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
} final byte[] cachedResult = cachedResultPair.rhs;
Sequence<Object> cachedSequence = new BaseSequence<>(
new BaseSequence.IteratorMaker<Object, Iterator<Object>>()
{
@Override
public Iterator<Object> make()
{
try {
if (cachedResult.length == 0) {
return Iterators.emptyIterator();
} }
);
listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction));
}
}
private void addSequencesFromServer(ArrayList<Sequence<T>> listOfSequences) return objectMapper.readValues(
{ objectMapper.getFactory().createParser(cachedResult),
listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size()); cacheObjectClazz
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
// Loop through each server, setting up the query and initiating it.
// The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter.
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
final DruidServer server = entry.getKey();
final List<SegmentDescriptor> descriptors = entry.getValue();
final QueryRunner clientQueryable = serverView.getQueryRunner(server);
if (clientQueryable == null) {
log.error("WTF!? server[%s] doesn't have a client Queryable?", server);
continue;
}
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
final Sequence<T> resultSeqToAdd;
if (!server.segmentReplicatable() || !populateCache || isBySegment) { // Direct server queryable
if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
// bySegment queries need to be de-serialized, see DirectDruidClient.run()
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultSequence = clientQueryable.run(
queryPlus.withQuerySegmentSpec(segmentSpec),
responseContext
);
resultSeqToAdd = (Sequence) Sequences.map(
resultSequence,
new Function<Result<BySegmentResultValueClass<T>>, Result<BySegmentResultValueClass<T>>>()
{
@Override
public Result<BySegmentResultValueClass<T>> apply(Result<BySegmentResultValueClass<T>> input)
{
final BySegmentResultValueClass<T> bySegmentValue = input.getValue();
return new Result<>(
input.getTimestamp(),
new BySegmentResultValueClass<T>(
Lists.transform(
bySegmentValue.getResults(),
toolChest.makePreComputeManipulatorFn(
query,
MetricManipulatorFns.deserializing()
)
),
bySegmentValue.getSegmentId(),
bySegmentValue.getInterval()
)
);
}
}
); );
} }
} else { // Requires some manipulation on broker side catch (IOException e) {
@SuppressWarnings("unchecked") throw new RuntimeException(e);
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryable.run( }
queryPlus.withQuery(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
responseContext
);
resultSeqToAdd = new MergeSequence(
query.getResultOrdering(),
Sequences.<Result<BySegmentResultValueClass<T>>, Sequence<T>>map(
runningSequence,
new Function<Result<BySegmentResultValueClass<T>>, Sequence<T>>()
{
private final Function<T, Object> cacheFn = strategy.prepareForCache();
// Acctually do something with the results
@Override
public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input)
{
final BySegmentResultValueClass<T> value = input.getValue();
final CachePopulator cachePopulator = cachePopulatorMap.get(
StringUtils.format("%s_%s", value.getSegmentId(), value.getInterval())
);
final Queue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
return Sequences.<T>withEffect(
Sequences.<T, T>map(
Sequences.<T, T>map(
Sequences.<T>simple(value.getResults()),
new Function<T, T>()
{
@Override
public T apply(final T input)
{
if (cachePopulator != null) {
// only compute cache data if populating cache
cacheFutures.add(
backgroundExecutorService.submit(
new Callable<Object>()
{
@Override
public Object call()
{
return cacheFn.apply(input);
}
}
)
);
}
return input;
}
}
),
toolChest.makePreComputeManipulatorFn(
// Ick... most makePreComputeManipulatorFn directly cast to their ToolChest query type of choice
// This casting is sub-optimal, but hasn't caused any major problems yet...
(Query) rewrittenQuery,
MetricManipulatorFns.deserializing()
)
),
new Runnable()
{
@Override
public void run()
{
if (cachePopulator != null) {
Futures.addCallback(
Futures.allAsList(cacheFutures),
new FutureCallback<List<Object>>()
{
@Override
public void onSuccess(List<Object> cacheData)
{
cachePopulator.populate(cacheData);
// Help out GC by making sure all references are gone
cacheFutures.clear();
}
@Override
public void onFailure(Throwable throwable)
{
log.error(throwable, "Background caching failed");
}
},
backgroundExecutorService
);
}
}
},
MoreExecutors.sameThreadExecutor()
);// End withEffect
}
}
)
);
} }
listOfSequences.add(resultSeqToAdd); @Override
public void cleanup(Iterator<Object> iterFromMake)
{
}
} }
} );
}// End of Supplier listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction));
); }
}
private static <VersionType, ObjectType> List<TimelineObjectHolder<VersionType, ObjectType>> lookupIntervalsInTimeline(
final TimelineLookup<VersionType, ObjectType> timeline,
final Iterable<Interval> intervals
)
{
return StreamSupport.stream(intervals.spliterator(), false)
.flatMap(interval -> StreamSupport.stream(timeline.lookup(interval).spliterator(), false))
.collect(Collectors.toList());
}
@VisibleForTesting
static <T> Sequence<T> mergeCachedAndUncachedSequences(
Query<T> query,
List<Sequence<T>> sequencesByInterval
)
{
if (sequencesByInterval.isEmpty()) {
return Sequences.empty();
} }
return new MergeSequence<>( private void addSequencesFromServer(
query.getResultOrdering(), final List<Sequence<T>> listOfSequences,
Sequences.simple(sequencesByInterval) final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer
); )
{
segmentsByServer.forEach((server, segmentsOfServer) -> {
final QueryRunner serverRunner = serverView.getQueryRunner(server);
if (serverRunner == null) {
log.error("Server[%s] doesn't have a query runner", server);
return;
}
final MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer);
Sequence<T> serverResults;
if (isBySegment) {
serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec);
} else if (!server.segmentReplicatable() || !populateCache) {
serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec);
} else {
serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec);
}
listOfSequences.add(serverResults);
});
}
@SuppressWarnings("unchecked")
private Sequence<T> getBySegmentServerResults(
final QueryRunner serverRunner,
final MultipleSpecificSegmentSpec segmentsOfServerSpec
)
{
Sequence<Result<BySegmentResultValueClass<T>>> resultsBySegments = serverRunner
.run(queryPlus.withQuerySegmentSpec(segmentsOfServerSpec), responseContext);
// bySegment results need to be de-serialized, see DirectDruidClient.run()
return (Sequence<T>) resultsBySegments
.map(result -> result.map(
resultsOfSegment -> resultsOfSegment.mapResults(
toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing())::apply
)
));
}
@SuppressWarnings("unchecked")
private Sequence<T> getSimpleServerResults(
final QueryRunner serverRunner,
final MultipleSpecificSegmentSpec segmentsOfServerSpec
)
{
return serverRunner.run(queryPlus.withQuerySegmentSpec(segmentsOfServerSpec), responseContext);
}
private Sequence<T> getAndCacheServerResults(
final QueryRunner serverRunner,
final MultipleSpecificSegmentSpec segmentsOfServerSpec
)
{
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultsBySegments = serverRunner.run(
queryPlus
.withQuery((Query<Result<BySegmentResultValueClass<T>>>) downstreamQuery)
.withQuerySegmentSpec(segmentsOfServerSpec),
responseContext
);
final Function<T, Object> cacheFn = strategy.prepareForCache();
return resultsBySegments
.map(result -> {
final BySegmentResultValueClass<T> resultsOfSegment = result.getValue();
final CachePopulator cachePopulator =
getCachePopulator(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
Sequence<T> res = Sequences
.simple(resultsOfSegment.getResults())
.map(r -> {
if (cachePopulator != null) {
// only compute cache data if populating cache
cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() -> cacheFn.apply(r)));
}
return r;
})
.map(
toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply
);
if (cachePopulator != null) {
res = res.withEffect(cachePopulator::populate, MoreExecutors.sameThreadExecutor());
}
return res;
})
.flatMerge(seq -> seq, query.getResultOrdering());
}
} }
private static class CachePopulator private static class ServerToSegment extends Pair<ServerSelector, SegmentDescriptor>
{
private ServerToSegment(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
}
ServerSelector getServer()
{
return lhs;
}
SegmentDescriptor getSegmentDescriptor()
{
return rhs;
}
}
private class CachePopulator
{ {
private final Cache cache; private final Cache cache;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final Cache.NamedKey key; private final Cache.NamedKey key;
private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key) CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
{ {
this.cache = cache; this.cache = cache;
this.mapper = mapper; this.mapper = mapper;
this.key = key; this.key = key;
} }
public void populate(Iterable<Object> results) public void populate()
{ {
CacheUtil.populate(cache, mapper, key, results); Futures.addCallback(
Futures.allAsList(cacheFutures),
new FutureCallback<List<Object>>()
{
@Override
public void onSuccess(List<Object> cacheData)
{
CacheUtil.populate(cache, mapper, key, cacheData);
// Help out GC by making sure all references are gone
cacheFutures.clear();
}
@Override
public void onFailure(Throwable throwable)
{
log.error(throwable, "Background caching failed");
}
},
backgroundExecutorService
);
} }
} }
} }

View File

@ -26,12 +26,14 @@ import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
*/ */
public interface TimelineServerView extends ServerView public interface TimelineServerView extends ServerView
{ {
@Nullable
TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource); TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource);
<T> QueryRunner<T> getQueryRunner(DruidServer server); <T> QueryRunner<T> getQueryRunner(DruidServer server);

View File

@ -96,8 +96,8 @@ public class QueryResource implements QueryCountStatsProvider
protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024; protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
public static final String HDR_IF_NONE_MATCH = "If-None-Match"; public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
public static final String HDR_ETAG = "ETag"; public static final String HEADER_ETAG = "ETag";
protected final QueryToolChestWarehouse warehouse; protected final QueryToolChestWarehouse warehouse;
protected final ServerConfig config; protected final ServerConfig config;
@ -235,16 +235,16 @@ public class QueryResource implements QueryCountStatsProvider
} }
} }
String prevEtag = req.getHeader(HDR_IF_NONE_MATCH); String prevEtag = req.getHeader(HEADER_IF_NONE_MATCH);
if (prevEtag != null) { if (prevEtag != null) {
query = query.withOverriddenContext( query = query.withOverriddenContext(
ImmutableMap.of (HDR_IF_NONE_MATCH, prevEtag) ImmutableMap.of (HEADER_IF_NONE_MATCH, prevEtag)
); );
} }
final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext);
if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { if (prevEtag != null && prevEtag.equals(responseContext.get(HEADER_ETAG))) {
return Response.notModified().build(); return Response.notModified().build();
} }
@ -347,9 +347,9 @@ public class QueryResource implements QueryCountStatsProvider
) )
.header("X-Druid-Query-Id", queryId); .header("X-Druid-Query-Id", queryId);
if (responseContext.get(HDR_ETAG) != null) { if (responseContext.get(HEADER_ETAG) != null) {
builder.header(HDR_ETAG, responseContext.get(HDR_ETAG)); builder.header(HEADER_ETAG, responseContext.get(HEADER_ETAG));
responseContext.remove(HDR_ETAG); responseContext.remove(HEADER_ETAG);
} }
responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME);

View File

@ -956,7 +956,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
), ),
client.mergeCachedAndUncachedSequences( mergeSequences(
new TopNQueryBuilder() new TopNQueryBuilder()
.dataSource("test") .dataSource("test")
.intervals("2011-01-06/2011-01-10") .intervals("2011-01-06/2011-01-10")
@ -970,6 +970,11 @@ public class CachingClusteredClientTest
); );
} }
private static <T> Sequence<T> mergeSequences(Query<T> query, List<Sequence<T>> sequences)
{
return Sequences.simple(sequences).flatMerge(seq -> seq, query.getResultOrdering());
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -128,7 +128,7 @@ public class QueryResourceTest
public void setup() public void setup()
{ {
EasyMock.expect(testServletRequest.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes(); EasyMock.expect(testServletRequest.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.expect(testServletRequest.getHeader(QueryResource.HDR_IF_NONE_MATCH)).andReturn(null).anyTimes(); EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes(); EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager(); queryManager = new QueryManager();
queryResource = new QueryResource( queryResource = new QueryResource(