address cr

This commit is contained in:
fjy 2014-06-16 14:20:16 -07:00
parent 08c88e8fb7
commit 9ca4f564ee
3 changed files with 34 additions and 53 deletions

View File

@ -147,10 +147,10 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList(); List<Result<TimeBoundaryResultValue>> results = Lists.newArrayList();
Map<String, Object> result = Maps.newHashMap(); Map<String, Object> result = Maps.newHashMap();
if (min != null && !bound.equalsIgnoreCase(MAX_TIME)) { if (min != null) {
result.put(MIN_TIME, min); result.put(MIN_TIME, min);
} }
if (max != null && !bound.equalsIgnoreCase(MIN_TIME)) { if (max != null) {
result.put(MAX_TIME, max); result.put(MAX_TIME, max);
} }
if (!result.isEmpty()) { if (!result.isEmpty()) {
@ -171,17 +171,13 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
for (Result<TimeBoundaryResultValue> result : results) { for (Result<TimeBoundaryResultValue> result : results) {
TimeBoundaryResultValue val = result.getValue(); TimeBoundaryResultValue val = result.getValue();
if (!bound.equalsIgnoreCase(MAX_TIME)) { DateTime currMinTime = val.getMinTime();
DateTime currMinTime = val.getMinTime(); if (currMinTime != null && currMinTime.isBefore(min)) {
if (currMinTime.isBefore(min)) { min = currMinTime;
min = currMinTime;
}
} }
if (!bound.equalsIgnoreCase(MIN_TIME)) { DateTime currMaxTime = val.getMaxTime();
DateTime currMaxTime = val.getMaxTime(); if (currMaxTime != null && currMaxTime.isAfter(max)) {
if (currMaxTime.isAfter(max)) { max = currMaxTime;
max = currMaxTime;
}
} }
} }

View File

@ -70,42 +70,18 @@ public class TimeBoundaryQueryQueryToolChest
final T min = segments.get(0); final T min = segments.get(0);
final T max = segments.get(segments.size() - 1); final T max = segments.get(segments.size() - 1);
final Predicate<T> filterPredicate;
// optimizations to avoid hitting too many segments
if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)) {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(min.getInterval());
}
};
} else if (query.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)) {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(max.getInterval());
}
};
} else {
filterPredicate = new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return input.getInterval().overlaps(min.getInterval()) || input.getInterval()
.overlaps(max.getInterval());
}
};
}
return Lists.newArrayList( return Lists.newArrayList(
Iterables.filter( Iterables.filter(
segments, segments,
filterPredicate new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return (min != null && input.getInterval().overlaps(min.getInterval())) ||
(max != null && input.getInterval().overlaps(max.getInterval()));
}
}
) )
); );
} }
@ -135,7 +111,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences) public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{ {
return new OrderedMergeSequence<Result<TimeBoundaryResultValue>>(getOrdering(), seqOfSequences); return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
} }
@Override @Override
@ -201,11 +177,11 @@ public class TimeBoundaryQueryQueryToolChest
{ {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Result<TimeBoundaryResultValue> apply(@Nullable Object input) public Result<TimeBoundaryResultValue> apply(Object input)
{ {
List<Object> result = (List<Object>) input; List<Object> result = (List<Object>) input;
return new Result<TimeBoundaryResultValue>( return new Result<>(
new DateTime(result.get(0)), new DateTime(result.get(0)),
new TimeBoundaryResultValue(result.get(1)) new TimeBoundaryResultValue(result.get(1))
); );
@ -216,7 +192,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override @Override
public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences) public Sequence<Result<TimeBoundaryResultValue>> mergeSequences(Sequence<Sequence<Result<TimeBoundaryResultValue>>> seqOfSequences)
{ {
return new MergeSequence<Result<TimeBoundaryResultValue>>(getOrdering(), seqOfSequences); return new MergeSequence<>(getOrdering(), seqOfSequences);
} }
}; };
} }

View File

@ -32,6 +32,7 @@ import io.druid.query.QueryWatcher;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -61,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory
ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners
) )
{ {
return new ChainedExecutionQueryRunner<Result<TimeBoundaryResultValue>>( return new ChainedExecutionQueryRunner<>(
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
); );
} }
@ -90,7 +91,7 @@ public class TimeBoundaryQueryRunnerFactory
final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input; final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input;
return new BaseSequence<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>( return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>() new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>()
{ {
@Override @Override
@ -102,10 +103,18 @@ public class TimeBoundaryQueryRunnerFactory
); );
} }
final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
return legacyQuery.buildResult( return legacyQuery.buildResult(
adapter.getInterval().getStart(), adapter.getInterval().getStart(),
adapter.getMinTime(), minTime,
adapter.getMaxTime() maxTime
).iterator(); ).iterator();
} }