refactor filterSegments to use TimelineObjectHolders

This commit is contained in:
xvrl 2013-02-11 18:40:14 -08:00
parent 6b87ef2921
commit d4009c8c1c
3 changed files with 34 additions and 45 deletions

View File

@ -139,49 +139,26 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// build set of segments to query
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
for (Interval interval : rewrittenQuery.getIntervals()) {
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = timeline.lookup(interval);
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
segments.add(Pair.of(selector, descriptor));
}
}
serversLookup.addAll(timeline.lookup(interval));
}
// Let tool chest filter out unneeded segments
final Set<SegmentDescriptor> filteredSegmentDescriptors = Sets.newLinkedHashSet(toolChest.filterSegments(
query,
Iterables.transform(
segments, new Function<Pair<ServerSelector, SegmentDescriptor>, SegmentDescriptor>()
{
@Override
public SegmentDescriptor apply(
@Nullable Pair<ServerSelector, SegmentDescriptor> input
)
{
return input.rhs;
}
}
)
));
final List<TimelineObjectHolder<String, ServerSelector>> filteredServersLookup =
toolChest.filterSegments(query, serversLookup);
// remove unneeded segments from list of segments to query
segments = Sets.newLinkedHashSet(Iterables.filter(segments, new Predicate<Pair<ServerSelector, SegmentDescriptor>>()
{
@Override
public boolean apply(
@Nullable Pair<ServerSelector, SegmentDescriptor> input
)
{
return filteredSegmentDescriptors.contains(input.rhs);
for (TimelineObjectHolder<String, ServerSelector> holder : filteredServersLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
segments.add(Pair.of(selector, descriptor));
}
}));
}
final byte[] queryCacheKey;
if(strategy != null) {

View File

@ -23,12 +23,16 @@ import com.google.common.base.Function;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.Interval;
import java.util.List;
/**
* The broker-side (also used by server in some cases) API for a specific Query type. This API is still undergoing
* evolution and is only semi-stable, so proprietary Query implementations should be ready for the potential
@ -61,7 +65,10 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
return runner;
}
public Iterable<SegmentDescriptor> filterSegments(QueryType query, Iterable<SegmentDescriptor> intervals) {
return intervals;
public List<TimelineObjectHolder<String, ServerSelector>> filterSegments(
QueryType query,
List<TimelineObjectHolder<String, ServerSelector>> segments
) {
return segments;
}
}

View File

@ -29,6 +29,8 @@ import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.collect.OrderedMergeSequence;
import com.metamx.druid.query.BySegmentSkippingQueryRunner;
import com.metamx.druid.query.CacheStrategy;
@ -64,16 +66,19 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public Iterable<SegmentDescriptor> filterSegments(TimeBoundaryQuery query, Iterable<SegmentDescriptor> input) {
public List<TimelineObjectHolder<String, ServerSelector>> filterSegments(
TimeBoundaryQuery query,
List<TimelineObjectHolder<String, ServerSelector>> input
) {
long minMillis = Long.MAX_VALUE;
long maxMillis = Long.MIN_VALUE;
SegmentDescriptor min = null;
SegmentDescriptor max = null;
TimelineObjectHolder<String, ServerSelector> min = null;
TimelineObjectHolder<String, ServerSelector> max = null;
// keep track of all segments in a given shard
Map<String, Set<SegmentDescriptor>> segmentGroups = Maps.newHashMap();
Map<String, Set<TimelineObjectHolder<String, ServerSelector>>> segmentGroups = Maps.newHashMap();
for(SegmentDescriptor e : input) {
for(TimelineObjectHolder<String, ServerSelector> e : input) {
final long start = e.getInterval().getStartMillis();
final long end = e.getInterval().getEndMillis();
final String version = e.getVersion();
@ -94,7 +99,7 @@ public class TimeBoundaryQueryQueryToolChest
}
}
return Sets.union(segmentGroups.get(min.getVersion()), segmentGroups.get(max.getVersion()));
return Lists.newArrayList(Sets.union(segmentGroups.get(min.getVersion()), segmentGroups.get(max.getVersion())));
}
@Override