Merge pull request #856 from metamx/druid-845

Fix query by segment
This commit is contained in:
xvrl 2014-11-14 13:10:54 -08:00
commit a4fc64ca3f
9 changed files with 147 additions and 54 deletions

View File

@ -19,15 +19,17 @@
package io.druid.query;
import org.joda.time.Interval;
import java.util.List;
/**
*/
public interface BySegmentResultValue<T>
{
public List<Result<T>> getResults();
public List<T> getResults();
public String getSegmentId();
public String getIntervalString();
public Interval getInterval();
}

View File

@ -26,7 +26,7 @@ import java.util.List;
/**
*/
public class BySegmentResultValueClass<T>
public class BySegmentResultValueClass<T> implements BySegmentResultValue<T>
{
private final List<T> results;
private final String segmentId;
@ -43,18 +43,21 @@ public class BySegmentResultValueClass<T>
this.interval = interval;
}
@Override
@JsonProperty("results")
public List<T> getResults()
{
return results;
}
@Override
@JsonProperty("segment")
public String getSegmentId()
{
return segmentId;
}
@Override
@JsonProperty("interval")
public Interval getInterval()
{

View File

@ -84,7 +84,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
throw new ISE("Cannot have a null result!");
}
BySegmentResultValueClass<T> resultsClass = result.getValue();
BySegmentResultValue<T> resultsClass = result.getValue();
return (T) new Result<BySegmentResultValueClass>(
result.getTimestamp(),

View File

@ -24,28 +24,30 @@ import com.fasterxml.jackson.annotation.JsonValue;
import io.druid.query.BySegmentResultValue;
import io.druid.query.Result;
import io.druid.query.search.search.SearchHit;
import org.joda.time.Interval;
import java.util.List;
/**
*/
public class BySegmentSearchResultValue extends SearchResultValue implements BySegmentResultValue<SearchResultValue>
public class BySegmentSearchResultValue extends SearchResultValue
implements BySegmentResultValue<Result<SearchResultValue>>
{
private final List<Result<SearchResultValue>> results;
private final String segmentId;
private final String intervalString;
private final Interval interval;
public BySegmentSearchResultValue(
@JsonProperty("results") List<Result<SearchResultValue>> results,
@JsonProperty("segment") String segmentId,
@JsonProperty("interval") String intervalString
@JsonProperty("interval") Interval interval
)
{
super(null);
this.results = results;
this.segmentId = segmentId;
this.intervalString = intervalString;
this.interval = interval;
}
@Override
@ -71,9 +73,9 @@ public class BySegmentSearchResultValue extends SearchResultValue implements ByS
@Override
@JsonProperty("interval")
public String getIntervalString()
public Interval getInterval()
{
return intervalString;
return interval;
}
@Override
@ -82,7 +84,7 @@ public class BySegmentSearchResultValue extends SearchResultValue implements ByS
return "BySegmentSearchResultValue{" +
"results=" + results +
", segmentId='" + segmentId + '\'' +
", intervalString='" + intervalString + '\'' +
", interval='" + interval.toString() + '\'' +
'}';
}
}

View File

@ -329,7 +329,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
),
value.getSegmentId(),
value.getIntervalString()
value.getInterval()
)
);
}

View File

@ -23,30 +23,32 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Result;
import org.joda.time.Interval;
import java.util.List;
/**
*/
public class BySegmentTopNResultValue extends TopNResultValue implements BySegmentResultValue<TopNResultValue>
public class BySegmentTopNResultValue extends TopNResultValue implements BySegmentResultValue<Result<TopNResultValue>>
{
private final List<Result<TopNResultValue>> results;
private final String segmentId;
private final String intervalString;
private final Interval interval;
@JsonCreator
public BySegmentTopNResultValue(
@JsonProperty("results") List<Result<TopNResultValue>> results,
@JsonProperty("segment") String segmentId,
@JsonProperty("interval") String intervalString
@JsonProperty("interval") Interval interval
)
{
super(null);
this.results = results;
this.segmentId = segmentId;
this.intervalString = intervalString;
this.interval = interval;
}
@Override
@ -73,9 +75,9 @@ public class BySegmentTopNResultValue extends TopNResultValue implements BySegme
@Override
@JsonProperty("interval")
public String getIntervalString()
public Interval getInterval()
{
return intervalString;
return interval;
}
@Override
@ -84,7 +86,7 @@ public class BySegmentTopNResultValue extends TopNResultValue implements BySegme
return "BySegmentTopNResultValue{" +
"results=" + results +
", segmentId='" + segmentId + '\'' +
", intervalString='" + intervalString + '\'' +
", interval='" + interval.toString() + '\'' +
'}';
}
}

View File

@ -35,6 +35,8 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BySegmentResultValue;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
@ -433,7 +435,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
{
if (isBySegment) {
BySegmentTopNResultValue value = (BySegmentTopNResultValue) input.getValue();
BySegmentResultValue<Result<TopNResultValue>> value = (BySegmentResultValue<Result<TopNResultValue>>) input
.getValue();
return new Result<TopNResultValue>(
input.getTimestamp(),
@ -460,7 +463,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
),
value.getSegmentId(),
value.getIntervalString()
value.getInterval()
)
);
}

View File

@ -19,10 +19,12 @@
package io.druid.query.topn;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.query.BySegmentResultValueClass;
@ -47,6 +49,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -167,7 +170,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -231,7 +234,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -296,11 +299,89 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@Test
public void testTopNBySegment()
{
final HashMap<String, Object> specialContext = new HashMap<String, Object>();
specialContext.put("bySegment", "true");
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(marketDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.context(specialContext)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"addRowsIndexConstant", 5356.814697265625D,
"index", 5351.814697265625D,
marketDimension, "total_market",
"uniques", QueryRunnerTestHelper.UNIQUES_2,
"rows", 4L
),
ImmutableMap.<String, Object>of(
"addRowsIndexConstant", 4880.669677734375D,
"index", 4875.669677734375D,
marketDimension, "upfront",
"uniques", QueryRunnerTestHelper.UNIQUES_2,
"rows", 4L
),
ImmutableMap.<String, Object>of(
"addRowsIndexConstant", 2250.8768157958984D,
"index", 2231.8768157958984D,
marketDimension, "spot",
"uniques", QueryRunnerTestHelper.UNIQUES_9,
"rows", 18L
)
)
)
)
);
Sequence<Result<TopNResultValue>> results = new TopNQueryQueryToolChest(new TopNQueryConfig()).postMergeQueryDecoration(
runner
).run(
query,
specialContext
);
List<Result<BySegmentTopNResultValue>> resultList = Sequences.toList(
Sequences.map(
results, new Function<Result<TopNResultValue>, Result<BySegmentTopNResultValue>>()
{
@Nullable
@Override
public Result<BySegmentTopNResultValue> apply(
Result<TopNResultValue> input
)
{
return new Result<BySegmentTopNResultValue>(
input.getTimestamp(),
(BySegmentTopNResultValue) input.getValue()
);
}
}
),
Lists.<Result<BySegmentTopNResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults, resultList.get(0).getValue().getResults());
}
@Test
public void testTopN()
{
@ -346,7 +427,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -395,7 +476,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -444,7 +525,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -486,7 +567,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -521,7 +602,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -570,7 +651,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -623,7 +704,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -665,7 +746,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -683,7 +764,7 @@ public class TopNQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
Lists.<Result<TopNResultValue>>newArrayList(
new Result<TopNResultValue>(
@ -722,7 +803,7 @@ public class TopNQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
Lists.<Result<TopNResultValue>>newArrayList(
new Result<TopNResultValue>(
@ -748,7 +829,7 @@ public class TopNQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
Sequences.toList(
runner.run(
@ -783,7 +864,7 @@ public class TopNQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
Sequences.toList(
runner.run(
@ -843,7 +924,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -892,7 +973,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -948,7 +1029,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -996,7 +1077,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1037,7 +1118,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1078,7 +1159,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1119,7 +1200,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1160,7 +1241,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1212,7 +1293,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1264,7 +1345,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1316,7 +1397,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1361,7 +1442,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1407,7 +1488,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1452,7 +1533,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1501,7 +1582,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1586,7 +1667,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
@ -1669,7 +1750,7 @@ public class TopNQueryRunnerTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
}

View File

@ -87,7 +87,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new UnionQueryRunner<>(
new UnionQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()