mirror of https://github.com/apache/druid.git
Fix query by segment
* Changed topN queries to use joda Interval instead of string values * topN by segment now implements BySegmentResultValue<Result<TopNResultValue>> instead of BySegmentResultValue<TopNResultValue> * Added a unit test which failed uner the prior implementation.
This commit is contained in:
parent
10b7ca9fa9
commit
31fed7d329
|
@ -19,15 +19,19 @@
|
||||||
|
|
||||||
package io.druid.query;
|
package io.druid.query;
|
||||||
|
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public interface BySegmentResultValue<T>
|
public interface BySegmentResultValue<T>
|
||||||
{
|
{
|
||||||
public List<Result<T>> getResults();
|
public List<T> getResults();
|
||||||
|
|
||||||
public String getSegmentId();
|
public String getSegmentId();
|
||||||
|
|
||||||
public String getIntervalString();
|
public String getIntervalString();
|
||||||
|
|
||||||
|
public Interval getInterval();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class BySegmentResultValueClass<T>
|
public class BySegmentResultValueClass<T> implements BySegmentResultValue<T>
|
||||||
{
|
{
|
||||||
private final List<T> results;
|
private final List<T> results;
|
||||||
private final String segmentId;
|
private final String segmentId;
|
||||||
|
@ -43,18 +43,27 @@ public class BySegmentResultValueClass<T>
|
||||||
this.interval = interval;
|
this.interval = interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@JsonProperty("results")
|
@JsonProperty("results")
|
||||||
public List<T> getResults()
|
public List<T> getResults()
|
||||||
{
|
{
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@JsonProperty("segment")
|
@JsonProperty("segment")
|
||||||
public String getSegmentId()
|
public String getSegmentId()
|
||||||
{
|
{
|
||||||
return segmentId;
|
return segmentId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getIntervalString()
|
||||||
|
{
|
||||||
|
return interval.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
@JsonProperty("interval")
|
@JsonProperty("interval")
|
||||||
public Interval getInterval()
|
public Interval getInterval()
|
||||||
{
|
{
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
||||||
throw new ISE("Cannot have a null result!");
|
throw new ISE("Cannot have a null result!");
|
||||||
}
|
}
|
||||||
|
|
||||||
BySegmentResultValueClass<T> resultsClass = result.getValue();
|
BySegmentResultValue<T> resultsClass = result.getValue();
|
||||||
|
|
||||||
return (T) new Result<BySegmentResultValueClass>(
|
return (T) new Result<BySegmentResultValueClass>(
|
||||||
result.getTimestamp(),
|
result.getTimestamp(),
|
||||||
|
|
|
@ -24,28 +24,30 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import io.druid.query.BySegmentResultValue;
|
import io.druid.query.BySegmentResultValue;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
import io.druid.query.search.search.SearchHit;
|
import io.druid.query.search.search.SearchHit;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class BySegmentSearchResultValue extends SearchResultValue implements BySegmentResultValue<SearchResultValue>
|
public class BySegmentSearchResultValue extends SearchResultValue
|
||||||
|
implements BySegmentResultValue<Result<SearchResultValue>>
|
||||||
{
|
{
|
||||||
private final List<Result<SearchResultValue>> results;
|
private final List<Result<SearchResultValue>> results;
|
||||||
private final String segmentId;
|
private final String segmentId;
|
||||||
private final String intervalString;
|
private final Interval interval;
|
||||||
|
|
||||||
public BySegmentSearchResultValue(
|
public BySegmentSearchResultValue(
|
||||||
@JsonProperty("results") List<Result<SearchResultValue>> results,
|
@JsonProperty("results") List<Result<SearchResultValue>> results,
|
||||||
@JsonProperty("segment") String segmentId,
|
@JsonProperty("segment") String segmentId,
|
||||||
@JsonProperty("interval") String intervalString
|
@JsonProperty("interval") Interval interval
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(null);
|
super(null);
|
||||||
|
|
||||||
this.results = results;
|
this.results = results;
|
||||||
this.segmentId = segmentId;
|
this.segmentId = segmentId;
|
||||||
this.intervalString = intervalString;
|
this.interval = interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -70,10 +72,16 @@ public class BySegmentSearchResultValue extends SearchResultValue implements ByS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty("interval")
|
|
||||||
public String getIntervalString()
|
public String getIntervalString()
|
||||||
{
|
{
|
||||||
return intervalString;
|
return interval.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty("interval")
|
||||||
|
public Interval getInterval()
|
||||||
|
{
|
||||||
|
return interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -82,7 +90,7 @@ public class BySegmentSearchResultValue extends SearchResultValue implements ByS
|
||||||
return "BySegmentSearchResultValue{" +
|
return "BySegmentSearchResultValue{" +
|
||||||
"results=" + results +
|
"results=" + results +
|
||||||
", segmentId='" + segmentId + '\'' +
|
", segmentId='" + segmentId + '\'' +
|
||||||
", intervalString='" + intervalString + '\'' +
|
", interval='" + interval.toString() + '\'' +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -329,7 +329,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
value.getSegmentId(),
|
value.getSegmentId(),
|
||||||
value.getIntervalString()
|
value.getInterval()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,30 +23,32 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonValue;
|
import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import io.druid.query.BySegmentResultValue;
|
import io.druid.query.BySegmentResultValue;
|
||||||
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class BySegmentTopNResultValue extends TopNResultValue implements BySegmentResultValue<TopNResultValue>
|
public class BySegmentTopNResultValue extends TopNResultValue implements BySegmentResultValue<Result<TopNResultValue>>
|
||||||
{
|
{
|
||||||
private final List<Result<TopNResultValue>> results;
|
private final List<Result<TopNResultValue>> results;
|
||||||
private final String segmentId;
|
private final String segmentId;
|
||||||
private final String intervalString;
|
private final Interval interval;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public BySegmentTopNResultValue(
|
public BySegmentTopNResultValue(
|
||||||
@JsonProperty("results") List<Result<TopNResultValue>> results,
|
@JsonProperty("results") List<Result<TopNResultValue>> results,
|
||||||
@JsonProperty("segment") String segmentId,
|
@JsonProperty("segment") String segmentId,
|
||||||
@JsonProperty("interval") String intervalString
|
@JsonProperty("interval") Interval interval
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(null);
|
super(null);
|
||||||
|
|
||||||
this.results = results;
|
this.results = results;
|
||||||
this.segmentId = segmentId;
|
this.segmentId = segmentId;
|
||||||
this.intervalString = intervalString;
|
this.interval = interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -72,10 +74,16 @@ public class BySegmentTopNResultValue extends TopNResultValue implements BySegme
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty("interval")
|
|
||||||
public String getIntervalString()
|
public String getIntervalString()
|
||||||
{
|
{
|
||||||
return intervalString;
|
return interval.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty("interval")
|
||||||
|
public Interval getInterval()
|
||||||
|
{
|
||||||
|
return interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -84,7 +92,7 @@ public class BySegmentTopNResultValue extends TopNResultValue implements BySegme
|
||||||
return "BySegmentTopNResultValue{" +
|
return "BySegmentTopNResultValue{" +
|
||||||
"results=" + results +
|
"results=" + results +
|
||||||
", segmentId='" + segmentId + '\'' +
|
", segmentId='" + segmentId + '\'' +
|
||||||
", intervalString='" + intervalString + '\'' +
|
", interval='" + interval.toString() + '\'' +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import com.metamx.common.guava.nary.BinaryFn;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import io.druid.collections.OrderedMergeSequence;
|
import io.druid.collections.OrderedMergeSequence;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.query.BySegmentResultValue;
|
||||||
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
import io.druid.query.CacheStrategy;
|
import io.druid.query.CacheStrategy;
|
||||||
import io.druid.query.IntervalChunkingQueryRunner;
|
import io.druid.query.IntervalChunkingQueryRunner;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
|
@ -433,7 +435,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
|
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
|
||||||
{
|
{
|
||||||
if (isBySegment) {
|
if (isBySegment) {
|
||||||
BySegmentTopNResultValue value = (BySegmentTopNResultValue) input.getValue();
|
BySegmentResultValue<Result<TopNResultValue>> value = (BySegmentResultValue<Result<TopNResultValue>>) input
|
||||||
|
.getValue();
|
||||||
|
|
||||||
return new Result<TopNResultValue>(
|
return new Result<TopNResultValue>(
|
||||||
input.getTimestamp(),
|
input.getTimestamp(),
|
||||||
|
@ -460,7 +463,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
value.getSegmentId(),
|
value.getSegmentId(),
|
||||||
value.getIntervalString()
|
value.getInterval()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,12 @@
|
||||||
|
|
||||||
package io.druid.query.topn;
|
package io.druid.query.topn;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.collections.StupidPool;
|
import io.druid.collections.StupidPool;
|
||||||
import io.druid.query.BySegmentResultValueClass;
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
|
@ -47,6 +49,7 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -301,6 +304,84 @@ public class TopNQueryRunnerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTopNBySegment()
|
||||||
|
{
|
||||||
|
|
||||||
|
final HashMap<String, Object> specialContext = new HashMap<String, Object>();
|
||||||
|
specialContext.put("bySegment", "true");
|
||||||
|
TopNQuery query = new TopNQueryBuilder()
|
||||||
|
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.granularity(QueryRunnerTestHelper.allGran)
|
||||||
|
.dimension(marketDimension)
|
||||||
|
.metric(QueryRunnerTestHelper.indexMetric)
|
||||||
|
.threshold(4)
|
||||||
|
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||||
|
.context(specialContext)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-04-01T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<Map<String, Object>>asList(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"addRowsIndexConstant", 5356.814697265625D,
|
||||||
|
"index", 5351.814697265625D,
|
||||||
|
marketDimension, "total_market",
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_2,
|
||||||
|
"rows", 4L
|
||||||
|
),
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"addRowsIndexConstant", 4880.669677734375D,
|
||||||
|
"index", 4875.669677734375D,
|
||||||
|
marketDimension, "upfront",
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_2,
|
||||||
|
"rows", 4L
|
||||||
|
),
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"addRowsIndexConstant", 2250.8768157958984D,
|
||||||
|
"index", 2231.8768157958984D,
|
||||||
|
marketDimension, "spot",
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_9,
|
||||||
|
"rows", 18L
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Sequence<Result<TopNResultValue>> results = new TopNQueryQueryToolChest(new TopNQueryConfig()).postMergeQueryDecoration(
|
||||||
|
runner
|
||||||
|
).run(
|
||||||
|
query,
|
||||||
|
specialContext
|
||||||
|
);
|
||||||
|
List<Result<BySegmentTopNResultValue>> resultList = Sequences.toList(
|
||||||
|
Sequences.map(
|
||||||
|
results, new Function<Result<TopNResultValue>, Result<BySegmentTopNResultValue>>()
|
||||||
|
{
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Result<BySegmentTopNResultValue> apply(
|
||||||
|
Result<TopNResultValue> input
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new Result<BySegmentTopNResultValue>(
|
||||||
|
input.getTimestamp(),
|
||||||
|
(BySegmentTopNResultValue) input.getValue()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
Lists.<Result<BySegmentTopNResultValue>>newArrayList()
|
||||||
|
);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, resultList.get(0).getValue().getResults());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTopN()
|
public void testTopN()
|
||||||
{
|
{
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
|
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
|
||||||
toolChest.postMergeQueryDecoration(
|
toolChest.postMergeQueryDecoration(
|
||||||
toolChest.mergeResults(
|
toolChest.mergeResults(
|
||||||
new UnionQueryRunner<>(
|
new UnionQueryRunner<T>(
|
||||||
new MetricsEmittingQueryRunner<T>(
|
new MetricsEmittingQueryRunner<T>(
|
||||||
emitter,
|
emitter,
|
||||||
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
new Function<Query<T>, ServiceMetricEvent.Builder>()
|
||||||
|
|
Loading…
Reference in New Issue