Merge pull request #494 from metamx/fix-bysegment-results

fix bySegmentResults
This commit is contained in:
fjy 2014-04-21 10:59:47 -06:00
commit 6dd8a38cc7
6 changed files with 168 additions and 117 deletions

View File

@ -174,6 +174,12 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${apache.curator.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>

View File

@ -64,10 +64,44 @@ public class BySegmentResultValueClass<T>
@Override
public String toString()
{
return "BySegmentTimeseriesResultValue{" +
return "BySegmentResultValue{" +
"results=" + results +
", segmentId='" + segmentId + '\'' +
", interval='" + interval + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BySegmentResultValueClass that = (BySegmentResultValueClass) o;
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false;
}
if (results != null ? !results.equals(that.results) : that.results != null) {
return false;
}
if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = results != null ? results.hashCode() : 0;
result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
result = 31 * result + (interval != null ? interval.hashCode() : 0);
return result;
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.FinalizeMetricManipulationFn;
import io.druid.query.aggregation.IdentityMetricManipulationFn;
import io.druid.query.aggregation.MetricManipulationFn;
@ -55,46 +54,45 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
final Query<T> queryToRun;
final Function<T, T> finalizerFn;
final MetricManipulationFn metricManipulationFn;
if (shouldFinalize) {
queryToRun = query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false));
metricManipulationFn = new FinalizeMetricManipulationFn();
if (isBySegment) {
finalizerFn = new Function<T, T>()
{
final Function<T, T> baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
new FinalizeMetricManipulationFn()
);
@Override
@SuppressWarnings("unchecked")
public T apply(@Nullable T input)
{
Result<BySegmentResultValueClass<T>> result = (Result<BySegmentResultValueClass<T>>) input;
BySegmentResultValueClass<T> resultsClass = result.getValue();
return (T) new Result<BySegmentResultValueClass>(
result.getTimestamp(),
new BySegmentResultValueClass(
Lists.transform(resultsClass.getResults(), baseFinalizer),
resultsClass.getSegmentId(),
resultsClass.getInterval()
)
);
}
};
} else {
finalizerFn = toolChest.makePostComputeManipulatorFn(query, new FinalizeMetricManipulationFn());
}
} else {
// finalize is false here.
queryToRun = query;
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new IdentityMetricManipulationFn()
);
metricManipulationFn = new IdentityMetricManipulationFn();
}
if (isBySegment) {
finalizerFn = new Function<T, T>()
{
final Function<T, T> baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
metricManipulationFn
);
@Override
@SuppressWarnings("unchecked")
public T apply(@Nullable T input)
{
Result<BySegmentResultValueClass<T>> result = (Result<BySegmentResultValueClass<T>>) input;
BySegmentResultValueClass<T> resultsClass = result.getValue();
return (T) new Result<BySegmentResultValueClass>(
result.getTimestamp(),
new BySegmentResultValueClass(
Lists.transform(resultsClass.getResults(), baseFinalizer),
resultsClass.getSegmentId(),
resultsClass.getInterval()
)
);
}
};
} else {
finalizerFn = toolChest.makePostComputeManipulatorFn(query, metricManipulationFn);
}
return Sequences.map(
baseRunner.run(queryToRun),

View File

@ -171,7 +171,10 @@ public class QueryRunnerTestHelper
)
{
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Druids;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
@ -64,7 +65,7 @@ public class TopNQueryRunnerTest
{
List<Object> retVal = Lists.newArrayList();
retVal.addAll(
TopNQueryRunnerTestHelper.makeQueryRunners(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
@ -72,7 +73,7 @@ public class TopNQueryRunnerTest
)
);
retVal.addAll(
TopNQueryRunnerTestHelper.makeQueryRunners(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
@ -698,13 +699,13 @@ public class TopNQueryRunnerTest
.fields(
Lists.<DimFilter>newArrayList(
Druids.newSelectorDimFilterBuilder()
.dimension(providerDimension)
.value("billyblank")
.build(),
.dimension(providerDimension)
.value("billyblank")
.build(),
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.qualityDimension)
.value("mezzanine")
.build()
.dimension(QueryRunnerTestHelper.qualityDimension)
.value("mezzanine")
.build()
)
).build();
TopNQuery query = new TopNQueryBuilder()
@ -1260,4 +1261,86 @@ public class TopNQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
@Test
public void testTopNBySegmentResults()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.providerDimension)
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(
Arrays.<PostAggregator>asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
)
)
.context(ImmutableMap.<String, Object>of("finalize", true, "bySegment", true))
.build();
TopNResultValue topNResult = new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 216053.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 192420.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
)
);
List<Result<BySegmentResultValueClass>> expectedResults = Arrays.asList(
new Result<BySegmentResultValueClass>(
new DateTime("2011-01-12T00:00:00.000Z"),
new BySegmentResultValueClass(
Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
topNResult
)
),
QueryRunnerTestHelper.segmentId,
new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z")
)
)
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
}

View File

@ -1,73 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
public class TopNQueryRunnerTestHelper
{
@SuppressWarnings("unchecked")
public static Collection<?> makeQueryRunners(
QueryRunnerFactory factory
)
throws IOException
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, null))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex))
}
}
);
}
public static <T> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}