mirror of
https://github.com/apache/druid.git
synced 2025-02-06 01:58:20 +00:00
Merge pull request #604 from metamx/more-tests
All manners of "new" tests
This commit is contained in:
commit
1c1f65ab5f
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
package io.druid.query.search;
|
package io.druid.query.search;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
@ -49,10 +48,8 @@ import io.druid.segment.column.BitmapIndex;
|
|||||||
import io.druid.segment.column.Column;
|
import io.druid.segment.column.Column;
|
||||||
import io.druid.segment.data.IndexedInts;
|
import io.druid.segment.data.IndexedInts;
|
||||||
import io.druid.segment.filter.Filters;
|
import io.druid.segment.filter.Filters;
|
||||||
import it.uniroma3.mat.extendedset.intset.ConciseSet;
|
|
||||||
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
|
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
@ -97,14 +94,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
|
|||||||
|
|
||||||
final ImmutableConciseSet baseFilter;
|
final ImmutableConciseSet baseFilter;
|
||||||
if (filter == null) {
|
if (filter == null) {
|
||||||
// Accept all, and work around https://github.com/metamx/extendedset/issues/1
|
baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows());
|
||||||
if (index.getNumRows() == 1) {
|
|
||||||
ConciseSet set = new ConciseSet();
|
|
||||||
set.add(0);
|
|
||||||
baseFilter = ImmutableConciseSet.newImmutableFromMutable(set);
|
|
||||||
} else {
|
|
||||||
baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index));
|
baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index));
|
||||||
}
|
}
|
||||||
|
@ -324,12 +324,6 @@ public class IndexMerger
|
|||||||
throw new ISE("Couldn't make outdir[%s].", outDir);
|
throw new ISE("Couldn't make outdir[%s].", outDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
if (indexes.size() < 2) {
|
|
||||||
throw new ISE("Too few indexes provided for append [%d].", indexes.size());
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
final List<String> mergedDimensions = mergeIndexed(
|
final List<String> mergedDimensions = mergeIndexed(
|
||||||
Lists.transform(
|
Lists.transform(
|
||||||
indexes,
|
indexes,
|
||||||
|
712
processing/src/test/java/io/druid/segment/AppendTest.java
Normal file
712
processing/src/test/java/io/druid/segment/AppendTest.java
Normal file
@ -0,0 +1,712 @@
|
|||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.Result;
|
||||||
|
import io.druid.query.TestQueryRunners;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||||
|
import io.druid.query.filter.DimFilter;
|
||||||
|
import io.druid.query.search.SearchResultValue;
|
||||||
|
import io.druid.query.search.search.SearchHit;
|
||||||
|
import io.druid.query.search.search.SearchQuery;
|
||||||
|
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import io.druid.query.spec.QuerySegmentSpec;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryResultValue;
|
||||||
|
import io.druid.query.timeseries.TimeseriesQuery;
|
||||||
|
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||||
|
import io.druid.query.topn.TopNQuery;
|
||||||
|
import io.druid.query.topn.TopNQueryBuilder;
|
||||||
|
import io.druid.query.topn.TopNResultValue;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class AppendTest
|
||||||
|
{
|
||||||
|
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||||
|
new DoubleSumAggregatorFactory("index", "index"),
|
||||||
|
new CountAggregatorFactory("count"),
|
||||||
|
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
|
||||||
|
};
|
||||||
|
private static final AggregatorFactory[] METRIC_AGGS_NO_UNIQ = new AggregatorFactory[]{
|
||||||
|
new DoubleSumAggregatorFactory("index", "index"),
|
||||||
|
new CountAggregatorFactory("count")
|
||||||
|
};
|
||||||
|
|
||||||
|
final String dataSource = "testing";
|
||||||
|
final QueryGranularity allGran = QueryGranularity.ALL;
|
||||||
|
final String dimensionValue = "dimension";
|
||||||
|
final String valueValue = "value";
|
||||||
|
final String providerDimension = "provider";
|
||||||
|
final String qualityDimension = "quality";
|
||||||
|
final String placementDimension = "placement";
|
||||||
|
final String placementishDimension = "placementish";
|
||||||
|
final String indexMetric = "index";
|
||||||
|
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||||
|
final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
|
||||||
|
final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques");
|
||||||
|
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||||
|
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||||
|
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||||
|
final ArithmeticPostAggregator addRowsIndexConstant =
|
||||||
|
new ArithmeticPostAggregator(
|
||||||
|
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
|
||||||
|
);
|
||||||
|
final List<AggregatorFactory> commonAggregators = Arrays.asList(rowsCount, indexDoubleSum, uniques);
|
||||||
|
|
||||||
|
final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec(
|
||||||
|
Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"))
|
||||||
|
);
|
||||||
|
|
||||||
|
private Segment segment;
|
||||||
|
private Segment segment2;
|
||||||
|
private Segment segment3;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
// (1, 2) cover overlapping segments of the form
|
||||||
|
// |------|
|
||||||
|
// |--------|
|
||||||
|
QueryableIndex appendedIndex = SchemalessIndex.getAppendedIncrementalIndex(
|
||||||
|
Arrays.asList(
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.1", METRIC_AGGS_NO_UNIQ),
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.2", METRIC_AGGS)
|
||||||
|
),
|
||||||
|
Arrays.asList(
|
||||||
|
new Interval("2011-01-12T00:00:00.000Z/2011-01-16T00:00:00.000Z"),
|
||||||
|
new Interval("2011-01-14T22:00:00.000Z/2011-01-16T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
segment = new QueryableIndexSegment(null, appendedIndex);
|
||||||
|
|
||||||
|
// (3, 4) cover overlapping segments of the form
|
||||||
|
// |------------|
|
||||||
|
// |-----|
|
||||||
|
QueryableIndex append2 = SchemalessIndex.getAppendedIncrementalIndex(
|
||||||
|
Arrays.asList(
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.3", METRIC_AGGS_NO_UNIQ),
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.4", METRIC_AGGS)
|
||||||
|
),
|
||||||
|
Arrays.asList(
|
||||||
|
new Interval("2011-01-12T00:00:00.000Z/2011-01-16T00:00:00.000Z"),
|
||||||
|
new Interval("2011-01-13T00:00:00.000Z/2011-01-14T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
segment2 = new QueryableIndexSegment(null, append2);
|
||||||
|
|
||||||
|
// (5, 6, 7) test gaps that can be created in data because of rows being discounted
|
||||||
|
// |-------------|
|
||||||
|
// |---|
|
||||||
|
// |---|
|
||||||
|
QueryableIndex append3 = SchemalessIndex.getAppendedIncrementalIndex(
|
||||||
|
Arrays.asList(
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.5", METRIC_AGGS),
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.6", METRIC_AGGS),
|
||||||
|
new Pair<String, AggregatorFactory[]>("append.json.7", METRIC_AGGS)
|
||||||
|
),
|
||||||
|
Arrays.asList(
|
||||||
|
new Interval("2011-01-12T00:00:00.000Z/2011-01-22T00:00:00.000Z"),
|
||||||
|
new Interval("2011-01-13T00:00:00.000Z/2011-01-16T00:00:00.000Z"),
|
||||||
|
new Interval("2011-01-18T00:00:00.000Z/2011-01-21T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
segment3 = new QueryableIndexSegment(null, append3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeBoundary()
|
||||||
|
{
|
||||||
|
List<Result<TimeBoundaryResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeBoundaryResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeBoundaryResultValue(
|
||||||
|
ImmutableMap.of(
|
||||||
|
TimeBoundaryQuery.MIN_TIME,
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
TimeBoundaryQuery.MAX_TIME,
|
||||||
|
new DateTime("2011-01-15T02:00:00.000Z")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.build();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeBoundaryQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeBoundary2()
|
||||||
|
{
|
||||||
|
List<Result<TimeBoundaryResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeBoundaryResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeBoundaryResultValue(
|
||||||
|
ImmutableMap.of(
|
||||||
|
TimeBoundaryQuery.MIN_TIME,
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
TimeBoundaryQuery.MAX_TIME,
|
||||||
|
new DateTime("2011-01-15T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.build();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeBoundaryQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeSeries()
|
||||||
|
{
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 8L)
|
||||||
|
.put("index", 700.0D)
|
||||||
|
.put("addRowsIndexConstant", 709.0D)
|
||||||
|
.put("uniques", 1.0002442201269182D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 0.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeseriesQuery query = makeTimeseriesQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeSeries2()
|
||||||
|
{
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 7L)
|
||||||
|
.put("index", 500.0D)
|
||||||
|
.put("addRowsIndexConstant", 508.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 0.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeseriesQuery query = makeTimeseriesQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredTimeSeries()
|
||||||
|
{
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 5L)
|
||||||
|
.put("index", 500.0D)
|
||||||
|
.put("addRowsIndexConstant", 506.0D)
|
||||||
|
.put("uniques", 1.0002442201269182D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeseriesQuery query = makeFilteredTimeseriesQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredTimeSeries2()
|
||||||
|
{
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 4L)
|
||||||
|
.put("index", 400.0D)
|
||||||
|
.put("addRowsIndexConstant", 405.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeseriesQuery query = makeFilteredTimeseriesQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTopNSeries()
|
||||||
|
{
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<Map<String, Object>>asList(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "spot")
|
||||||
|
.put("rows", 3L)
|
||||||
|
.put("index", 300.0D)
|
||||||
|
.put("addRowsIndexConstant", 304.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0)
|
||||||
|
.put("minIndex", 100.0)
|
||||||
|
.build(),
|
||||||
|
new HashMap<String, Object>()
|
||||||
|
{{
|
||||||
|
put("provider", null);
|
||||||
|
put("rows", 3L);
|
||||||
|
put("index", 200.0D);
|
||||||
|
put("addRowsIndexConstant", 204.0D);
|
||||||
|
put("uniques", 0.0D);
|
||||||
|
put("maxIndex", 100.0);
|
||||||
|
put("minIndex", 0.0);
|
||||||
|
}},
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "total_market")
|
||||||
|
.put("rows", 2L)
|
||||||
|
.put("index", 200.0D)
|
||||||
|
.put("addRowsIndexConstant", 203.0D)
|
||||||
|
.put("uniques", 1.0002442201269182D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TopNQuery query = makeTopNQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTopNSeries2()
|
||||||
|
{
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<Map<String, Object>>asList(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "total_market")
|
||||||
|
.put("rows", 3L)
|
||||||
|
.put("index", 300.0D)
|
||||||
|
.put("addRowsIndexConstant", 304.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build(),
|
||||||
|
new HashMap<String, Object>()
|
||||||
|
{{
|
||||||
|
put("provider", null);
|
||||||
|
put("rows", 3L);
|
||||||
|
put("index", 100.0D);
|
||||||
|
put("addRowsIndexConstant", 104.0D);
|
||||||
|
put("uniques", 0.0D);
|
||||||
|
put("maxIndex", 100.0);
|
||||||
|
put("minIndex", 0.0);
|
||||||
|
}},
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "spot")
|
||||||
|
.put("rows", 1L)
|
||||||
|
.put("index", 100.0D)
|
||||||
|
.put("addRowsIndexConstant", 102.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0)
|
||||||
|
.put("minIndex", 100.0)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TopNQuery query = makeTopNQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredTopNSeries()
|
||||||
|
{
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<Map<String, Object>>asList(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "spot")
|
||||||
|
.put("rows", 1L)
|
||||||
|
.put("index", 100.0D)
|
||||||
|
.put("addRowsIndexConstant", 102.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0)
|
||||||
|
.put("minIndex", 100.0)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TopNQuery query = makeFilteredTopNQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredTopNSeries2()
|
||||||
|
{
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Lists.<Map<String, Object>>newArrayList()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TopNQuery query = makeFilteredTopNQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSearch()
|
||||||
|
{
|
||||||
|
List<Result<SearchResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<SearchResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new SearchResultValue(
|
||||||
|
Arrays.<SearchHit>asList(
|
||||||
|
new SearchHit(placementishDimension, "a"),
|
||||||
|
new SearchHit(qualityDimension, "automotive"),
|
||||||
|
new SearchHit(placementDimension, "mezzanine"),
|
||||||
|
new SearchHit(providerDimension, "total_market")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
SearchQuery query = makeSearchQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSearchWithOverlap()
|
||||||
|
{
|
||||||
|
List<Result<SearchResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<SearchResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new SearchResultValue(
|
||||||
|
Arrays.<SearchHit>asList(
|
||||||
|
new SearchHit(placementishDimension, "a"),
|
||||||
|
new SearchHit(placementDimension, "mezzanine"),
|
||||||
|
new SearchHit(providerDimension, "total_market")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
SearchQuery query = makeSearchQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredSearch()
|
||||||
|
{
|
||||||
|
List<Result<SearchResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<SearchResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new SearchResultValue(
|
||||||
|
Arrays.<SearchHit>asList(
|
||||||
|
new SearchHit(placementDimension, "mezzanine"),
|
||||||
|
new SearchHit(providerDimension, "total_market")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
SearchQuery query = makeFilteredSearchQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilteredSearch2()
|
||||||
|
{
|
||||||
|
List<Result<SearchResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<SearchResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new SearchResultValue(
|
||||||
|
Arrays.<SearchHit>asList(
|
||||||
|
new SearchHit(placementishDimension, "a"),
|
||||||
|
new SearchHit(placementDimension, "mezzanine"),
|
||||||
|
new SearchHit(providerDimension, "total_market")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
SearchQuery query = makeFilteredSearchQuery();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment2);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRowFiltering()
|
||||||
|
{
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 5L)
|
||||||
|
.put("index", 500.0D)
|
||||||
|
.put("addRowsIndexConstant", 506.0D)
|
||||||
|
.put("uniques", 0.0D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.filters(providerDimension, "breakstuff")
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment3);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimeseriesQuery makeTimeseriesQuery()
|
||||||
|
{
|
||||||
|
return Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimeseriesQuery makeFilteredTimeseriesQuery()
|
||||||
|
{
|
||||||
|
return Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.filters(
|
||||||
|
Druids.newOrDimFilterBuilder()
|
||||||
|
.fields(
|
||||||
|
Arrays.<DimFilter>asList(
|
||||||
|
Druids.newSelectorDimFilterBuilder()
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.value("spot")
|
||||||
|
.build(),
|
||||||
|
Druids.newSelectorDimFilterBuilder()
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.value("total_market")
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
).build()
|
||||||
|
)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopNQuery makeTopNQuery()
|
||||||
|
{
|
||||||
|
return new TopNQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.metric(indexMetric)
|
||||||
|
.threshold(3)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopNQuery makeFilteredTopNQuery()
|
||||||
|
{
|
||||||
|
return new TopNQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.metric(indexMetric)
|
||||||
|
.threshold(3)
|
||||||
|
.filters(
|
||||||
|
Druids.newAndDimFilterBuilder()
|
||||||
|
.fields(
|
||||||
|
Arrays.<DimFilter>asList(
|
||||||
|
Druids.newSelectorDimFilterBuilder()
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.value("spot")
|
||||||
|
.build(),
|
||||||
|
Druids.newSelectorDimFilterBuilder()
|
||||||
|
.dimension(placementDimension)
|
||||||
|
.value("preferred")
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
).build()
|
||||||
|
)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private SearchQuery makeSearchQuery()
|
||||||
|
{
|
||||||
|
return Druids.newSearchQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.query("a")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private SearchQuery makeFilteredSearchQuery()
|
||||||
|
{
|
||||||
|
return Druids.newSearchQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.filters(
|
||||||
|
Druids.newNotDimFilterBuilder()
|
||||||
|
.field(
|
||||||
|
Druids.newSelectorDimFilterBuilder()
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.value("spot")
|
||||||
|
.build()
|
||||||
|
).build()
|
||||||
|
)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.query("a")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
512
processing/src/test/java/io/druid/segment/SchemalessIndex.java
Normal file
512
processing/src/test/java/io/druid/segment/SchemalessIndex.java
Normal file
@ -0,0 +1,512 @@
|
|||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.hash.Hashing;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
|
import io.druid.segment.serde.ComplexMetrics;
|
||||||
|
import io.druid.timeline.TimelineObjectHolder;
|
||||||
|
import io.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import io.druid.timeline.partition.ShardSpec;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class SchemalessIndex
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(SchemalessIndex.class);
|
||||||
|
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
private static final String testFile = "druid.sample.json";
|
||||||
|
private static final String TIMESTAMP = "timestamp";
|
||||||
|
private static final List<String> METRICS = Arrays.asList("index");
|
||||||
|
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||||
|
new DoubleSumAggregatorFactory("index", "index"),
|
||||||
|
new CountAggregatorFactory("count"),
|
||||||
|
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
|
||||||
|
};
|
||||||
|
private static final AggregatorFactory[] METRIC_AGGS_NO_UNIQ = new AggregatorFactory[]{
|
||||||
|
new DoubleSumAggregatorFactory("index", "index"),
|
||||||
|
new CountAggregatorFactory("count")
|
||||||
|
};
|
||||||
|
|
||||||
|
private static final List<Map<String, Object>> events = Lists.newArrayList();
|
||||||
|
|
||||||
|
private static final Map<Integer, Map<Integer, QueryableIndex>> incrementalIndexes = Maps.newHashMap();
|
||||||
|
private static final Map<Integer, Map<Integer, QueryableIndex>> mergedIndexes = Maps.newHashMap();
|
||||||
|
private static final List<QueryableIndex> rowPersistedIndexes = Lists.newArrayList();
|
||||||
|
|
||||||
|
private static IncrementalIndex index = null;
|
||||||
|
private static QueryableIndex mergedIndex = null;
|
||||||
|
|
||||||
|
static {
|
||||||
|
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||||
|
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IncrementalIndex getIncrementalIndex()
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
if (index != null) {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
index = makeIncrementalIndex(testFile, METRIC_AGGS);
|
||||||
|
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getIncrementalIndex(int index1, int index2)
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
if (events.isEmpty()) {
|
||||||
|
makeEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Integer, QueryableIndex> entry = incrementalIndexes.get(index1);
|
||||||
|
if (entry != null) {
|
||||||
|
QueryableIndex index = entry.get(index2);
|
||||||
|
if (index != null) {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entry = Maps.<Integer, QueryableIndex>newHashMap();
|
||||||
|
incrementalIndexes.put(index1, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
IncrementalIndex theIndex = null;
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (final Map<String, Object> event : events) {
|
||||||
|
if (count != index1 && count != index2) {
|
||||||
|
count++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis();
|
||||||
|
|
||||||
|
if (theIndex == null) {
|
||||||
|
theIndex = new IncrementalIndex(timestamp, QueryGranularity.MINUTE, METRIC_AGGS);
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<String> dims = Lists.newArrayList();
|
||||||
|
for (final Map.Entry<String, Object> val : event.entrySet()) {
|
||||||
|
if (!val.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(val.getKey())) {
|
||||||
|
dims.add(val.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
QueryableIndex retVal = TestIndex.persistRealtimeAndLoadMMapped(theIndex);
|
||||||
|
entry.put(index2, retVal);
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getMergedIncrementalIndex()
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
if (mergedIndex != null) {
|
||||||
|
return mergedIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
IncrementalIndex top = makeIncrementalIndex("druid.sample.json.top", METRIC_AGGS);
|
||||||
|
IncrementalIndex bottom = makeIncrementalIndex("druid.sample.json.bottom", METRIC_AGGS);
|
||||||
|
|
||||||
|
File tmpFile = File.createTempFile("yay", "who");
|
||||||
|
tmpFile.delete();
|
||||||
|
|
||||||
|
File topFile = new File(tmpFile, "top");
|
||||||
|
File bottomFile = new File(tmpFile, "bottom");
|
||||||
|
File mergedFile = new File(tmpFile, "merged");
|
||||||
|
|
||||||
|
topFile.mkdirs();
|
||||||
|
topFile.deleteOnExit();
|
||||||
|
bottomFile.mkdirs();
|
||||||
|
bottomFile.deleteOnExit();
|
||||||
|
mergedFile.mkdirs();
|
||||||
|
mergedFile.deleteOnExit();
|
||||||
|
|
||||||
|
IndexMerger.persist(top, topFile);
|
||||||
|
IndexMerger.persist(bottom, bottomFile);
|
||||||
|
|
||||||
|
mergedIndex = io.druid.segment.IndexIO.loadIndex(
|
||||||
|
IndexMerger.mergeQueryableIndex(
|
||||||
|
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
return mergedIndex;
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
mergedIndex = null;
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getMergedIncrementalIndex(int index1, int index2)
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
if (rowPersistedIndexes.isEmpty()) {
|
||||||
|
makeRowPersistedIndexes();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Integer, QueryableIndex> entry = mergedIndexes.get(index1);
|
||||||
|
if (entry != null) {
|
||||||
|
QueryableIndex index = entry.get(index2);
|
||||||
|
if (index != null) {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entry = Maps.<Integer, QueryableIndex>newHashMap();
|
||||||
|
mergedIndexes.put(index1, entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
File tmpFile = File.createTempFile("yay", "who");
|
||||||
|
tmpFile.delete();
|
||||||
|
|
||||||
|
File mergedFile = new File(tmpFile, "merged");
|
||||||
|
|
||||||
|
mergedFile.mkdirs();
|
||||||
|
mergedFile.deleteOnExit();
|
||||||
|
|
||||||
|
QueryableIndex index = IndexIO.loadIndex(
|
||||||
|
IndexMerger.mergeQueryableIndex(
|
||||||
|
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
entry.put(index2, index);
|
||||||
|
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getMergedIncrementalIndex(int[] indexes)
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
if (rowPersistedIndexes.isEmpty()) {
|
||||||
|
makeRowPersistedIndexes();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
File tmpFile = File.createTempFile("yay", "who");
|
||||||
|
tmpFile.delete();
|
||||||
|
|
||||||
|
File mergedFile = new File(tmpFile, "merged");
|
||||||
|
|
||||||
|
mergedFile.mkdirs();
|
||||||
|
mergedFile.deleteOnExit();
|
||||||
|
|
||||||
|
List<QueryableIndex> indexesToMerge = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < indexes.length; i++) {
|
||||||
|
indexesToMerge.add(rowPersistedIndexes.get(indexes[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryableIndex index = IndexIO.loadIndex(
|
||||||
|
IndexMerger.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile)
|
||||||
|
);
|
||||||
|
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getAppendedIncrementalIndex(
|
||||||
|
Iterable<Pair<String, AggregatorFactory[]>> files,
|
||||||
|
List<Interval> intervals
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return makeAppendedMMappedIndex(files, intervals);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getMergedIncrementalIndexDiffMetrics()
|
||||||
|
{
|
||||||
|
return getMergedIncrementalIndex(
|
||||||
|
Arrays.<Pair<String, AggregatorFactory[]>>asList(
|
||||||
|
new Pair<String, AggregatorFactory[]>("druid.sample.json.top", METRIC_AGGS_NO_UNIQ),
|
||||||
|
new Pair<String, AggregatorFactory[]>("druid.sample.json.bottom", METRIC_AGGS)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryableIndex getMergedIncrementalIndex(Iterable<Pair<String, AggregatorFactory[]>> files)
|
||||||
|
{
|
||||||
|
return makeMergedMMappedIndex(files);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void makeEvents()
|
||||||
|
{
|
||||||
|
URL resource = TestIndex.class.getClassLoader().getResource(testFile);
|
||||||
|
String filename = resource.getFile();
|
||||||
|
log.info("Realtime loading index file[%s]", filename);
|
||||||
|
try {
|
||||||
|
for (Object obj : jsonMapper.readValue(new File(filename), List.class)) {
|
||||||
|
final Map<String, Object> event = jsonMapper.convertValue(obj, Map.class);
|
||||||
|
events.add(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void makeRowPersistedIndexes()
|
||||||
|
{
|
||||||
|
synchronized (log) {
|
||||||
|
try {
|
||||||
|
if (events.isEmpty()) {
|
||||||
|
makeEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final Map<String, Object> event : events) {
|
||||||
|
|
||||||
|
final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis();
|
||||||
|
final List<String> dims = Lists.newArrayList();
|
||||||
|
for (Map.Entry<String, Object> entry : event.entrySet()) {
|
||||||
|
if (!entry.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(entry.getKey())) {
|
||||||
|
dims.add(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final IncrementalIndex rowIndex = new IncrementalIndex(
|
||||||
|
timestamp, QueryGranularity.MINUTE, METRIC_AGGS
|
||||||
|
);
|
||||||
|
|
||||||
|
rowIndex.add(
|
||||||
|
new MapBasedInputRow(timestamp, dims, event)
|
||||||
|
);
|
||||||
|
|
||||||
|
File tmpFile = File.createTempFile("billy", "yay");
|
||||||
|
tmpFile.delete();
|
||||||
|
tmpFile.mkdirs();
|
||||||
|
tmpFile.deleteOnExit();
|
||||||
|
|
||||||
|
IndexMerger.persist(rowIndex, tmpFile);
|
||||||
|
rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IncrementalIndex makeIncrementalIndex(final String resourceFilename, AggregatorFactory[] aggs)
|
||||||
|
{
|
||||||
|
URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
|
||||||
|
log.info("Realtime loading resource[%s]", resource);
|
||||||
|
String filename = resource.getFile();
|
||||||
|
log.info("Realtime loading index file[%s]", filename);
|
||||||
|
|
||||||
|
final IncrementalIndex retVal = new IncrementalIndex(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.MINUTE, aggs
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final List<Object> events = jsonMapper.readValue(new File(filename), List.class);
|
||||||
|
for (Object obj : events) {
|
||||||
|
final Map<String, Object> event = jsonMapper.convertValue(obj, Map.class);
|
||||||
|
|
||||||
|
final List<String> dims = Lists.newArrayList();
|
||||||
|
for (Map.Entry<String, Object> entry : event.entrySet()) {
|
||||||
|
if (!entry.getKey().equalsIgnoreCase(TIMESTAMP) && !METRICS.contains(entry.getKey())) {
|
||||||
|
dims.add(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retVal.add(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
new DateTime(event.get(TIMESTAMP)).getMillis(),
|
||||||
|
dims,
|
||||||
|
event
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
index = null;
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<File> makeFilesToMap(File tmpFile, Iterable<Pair<String, AggregatorFactory[]>> files)
|
||||||
|
throws IOException
|
||||||
|
{
|
||||||
|
List<File> filesToMap = Lists.newArrayList();
|
||||||
|
for (Pair<String, AggregatorFactory[]> file : files) {
|
||||||
|
IncrementalIndex index = makeIncrementalIndex(file.lhs, file.rhs);
|
||||||
|
File theFile = new File(tmpFile, file.lhs);
|
||||||
|
theFile.mkdirs();
|
||||||
|
theFile.deleteOnExit();
|
||||||
|
filesToMap.add(theFile);
|
||||||
|
IndexMerger.persist(index, theFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
return filesToMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static QueryableIndex makeAppendedMMappedIndex(
|
||||||
|
Iterable<Pair<String, AggregatorFactory[]>> files,
|
||||||
|
final List<Interval> intervals
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
File tmpFile = File.createTempFile("yay", "boo");
|
||||||
|
tmpFile.delete();
|
||||||
|
File mergedFile = new File(tmpFile, "merged");
|
||||||
|
mergedFile.mkdirs();
|
||||||
|
mergedFile.deleteOnExit();
|
||||||
|
|
||||||
|
List<File> filesToMap = makeFilesToMap(tmpFile, files);
|
||||||
|
|
||||||
|
List<IndexableAdapter> adapters = Lists.newArrayList();
|
||||||
|
|
||||||
|
VersionedIntervalTimeline<Integer, File> timeline = new VersionedIntervalTimeline<Integer, File>(
|
||||||
|
Ordering.natural().nullsFirst()
|
||||||
|
);
|
||||||
|
|
||||||
|
ShardSpec noneShardSpec = new NoneShardSpec();
|
||||||
|
|
||||||
|
for (int i = 0; i < intervals.size(); i++) {
|
||||||
|
timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Pair<File, Interval>> intervalsToMerge = Lists.transform(
|
||||||
|
timeline.lookup(new Interval("1000-01-01/3000-01-01")),
|
||||||
|
new Function<TimelineObjectHolder<Integer, File>, Pair<File, Interval>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Pair<File, Interval> apply(@Nullable TimelineObjectHolder<Integer, File> input)
|
||||||
|
{
|
||||||
|
return new Pair<File, Interval>(input.getObject().getChunk(0).getObject(), input.getInterval());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
for (final Pair<File, Interval> pair : intervalsToMerge) {
|
||||||
|
adapters.add(
|
||||||
|
new RowboatFilteringIndexAdapter(
|
||||||
|
new QueryableIndexIndexableAdapter(IndexIO.loadIndex(pair.lhs)),
|
||||||
|
new Predicate<Rowboat>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean apply(@Nullable Rowboat input)
|
||||||
|
{
|
||||||
|
return pair.rhs.contains(input.getTimestamp());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile));
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static QueryableIndex makeMergedMMappedIndex(Iterable<Pair<String, AggregatorFactory[]>> files)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
File tmpFile = File.createTempFile("yay", "who");
|
||||||
|
tmpFile.delete();
|
||||||
|
File mergedFile = new File(tmpFile, "merged");
|
||||||
|
mergedFile.mkdirs();
|
||||||
|
mergedFile.deleteOnExit();
|
||||||
|
|
||||||
|
List<File> filesToMap = makeFilesToMap(tmpFile, files);
|
||||||
|
|
||||||
|
return IndexIO.loadIndex(
|
||||||
|
IndexMerger.mergeQueryableIndex(
|
||||||
|
Lists.newArrayList(
|
||||||
|
Iterables.transform(
|
||||||
|
filesToMap,
|
||||||
|
new Function<File, QueryableIndex>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public QueryableIndex apply(@Nullable File input)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return IndexIO.loadIndex(input);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
METRIC_AGGS,
|
||||||
|
mergedFile
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
1546
processing/src/test/java/io/druid/segment/SchemalessTestFull.java
Normal file
1546
processing/src/test/java/io/druid/segment/SchemalessTestFull.java
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,295 @@
|
|||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.Result;
|
||||||
|
import io.druid.query.TestQueryRunners;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
|
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||||
|
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||||
|
import io.druid.query.search.SearchResultValue;
|
||||||
|
import io.druid.query.search.search.SearchHit;
|
||||||
|
import io.druid.query.search.search.SearchQuery;
|
||||||
|
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import io.druid.query.spec.QuerySegmentSpec;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryResultValue;
|
||||||
|
import io.druid.query.timeseries.TimeseriesQuery;
|
||||||
|
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||||
|
import io.druid.query.topn.DimensionAndMetricValueExtractor;
|
||||||
|
import io.druid.query.topn.TopNQuery;
|
||||||
|
import io.druid.query.topn.TopNQueryBuilder;
|
||||||
|
import io.druid.query.topn.TopNResultValue;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class SchemalessTestSimple
|
||||||
|
{
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<?> constructorFeeder() throws IOException
|
||||||
|
{
|
||||||
|
final IncrementalIndex incrementalIndex = SchemalessIndex.getIncrementalIndex();
|
||||||
|
final QueryableIndex persistedIncrementalIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex);
|
||||||
|
final QueryableIndex mergedIncrementalIndex = SchemalessIndex.getMergedIncrementalIndex();
|
||||||
|
|
||||||
|
return Arrays.asList(
|
||||||
|
new Object[][]{
|
||||||
|
{
|
||||||
|
new IncrementalIndexSegment(incrementalIndex, null)
|
||||||
|
},
|
||||||
|
{
|
||||||
|
new QueryableIndexSegment(
|
||||||
|
null, persistedIncrementalIndex
|
||||||
|
)
|
||||||
|
},
|
||||||
|
{
|
||||||
|
new QueryableIndexSegment(
|
||||||
|
null, mergedIncrementalIndex
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String dataSource = "testing";
|
||||||
|
final QueryGranularity allGran = QueryGranularity.ALL;
|
||||||
|
final String dimensionValue = "dimension";
|
||||||
|
final String valueValue = "value";
|
||||||
|
final String providerDimension = "provider";
|
||||||
|
final String qualityDimension = "quality";
|
||||||
|
final String placementDimension = "placement";
|
||||||
|
final String placementishDimension = "placementish";
|
||||||
|
final String indexMetric = "index";
|
||||||
|
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||||
|
final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
|
||||||
|
final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques");
|
||||||
|
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||||
|
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||||
|
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||||
|
final ArithmeticPostAggregator addRowsIndexConstant =
|
||||||
|
new ArithmeticPostAggregator(
|
||||||
|
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
|
||||||
|
);
|
||||||
|
final List<AggregatorFactory> commonAggregators = Arrays.asList(rowsCount, indexDoubleSum, uniques);
|
||||||
|
|
||||||
|
final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec(
|
||||||
|
Arrays.asList(new Interval("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"))
|
||||||
|
);
|
||||||
|
|
||||||
|
private Segment segment;
|
||||||
|
|
||||||
|
public SchemalessTestSimple(
|
||||||
|
Segment segment
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.segment = segment;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFullOnTimeseries()
|
||||||
|
{
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeseriesResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeseriesResultValue(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("rows", 11L)
|
||||||
|
.put("index", 900.0)
|
||||||
|
.put("addRowsIndexConstant", 912.0)
|
||||||
|
.put("uniques", 2.000977198748901D)
|
||||||
|
.put("maxIndex", 100.0)
|
||||||
|
.put("minIndex", 0.0)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// @Test TODO: Handling of null values is inconsistent right now, need to make it all consistent and re-enable test
|
||||||
|
// TODO: Complain to Eric when you see this. It shouldn't be like this...
|
||||||
|
public void testFullOnTopN()
|
||||||
|
{
|
||||||
|
TopNQuery query = new TopNQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.dimension(providerDimension)
|
||||||
|
.metric(indexMetric)
|
||||||
|
.threshold(3)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Lists.<AggregatorFactory>newArrayList(
|
||||||
|
Iterables.concat(
|
||||||
|
commonAggregators,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new MinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TopNResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<DimensionAndMetricValueExtractor>asList(
|
||||||
|
new DimensionAndMetricValueExtractor(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "spot")
|
||||||
|
.put("rows", 4L)
|
||||||
|
.put("index", 400.0D)
|
||||||
|
.put("addRowsIndexConstant", 405.0D)
|
||||||
|
.put("uniques", 1.0002442201269182D)
|
||||||
|
.put("maxIndex", 100.0)
|
||||||
|
.put("minIndex", 100.0)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
new DimensionAndMetricValueExtractor(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "")
|
||||||
|
.put("rows", 2L)
|
||||||
|
.put("index", 200.0D)
|
||||||
|
.put("addRowsIndexConstant", 203.0D)
|
||||||
|
.put("uniques", 0.0)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
new DimensionAndMetricValueExtractor(
|
||||||
|
ImmutableMap.<String, Object>builder()
|
||||||
|
.put("provider", "total_market")
|
||||||
|
.put("rows", 2L)
|
||||||
|
.put("index", 200.0D)
|
||||||
|
.put("addRowsIndexConstant", 203.0D)
|
||||||
|
.put("uniques", 1.0002442201269182D)
|
||||||
|
.put("maxIndex", 100.0D)
|
||||||
|
.put("minIndex", 100.0D)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFullOnSearch()
|
||||||
|
{
|
||||||
|
SearchQuery query = Druids.newSearchQueryBuilder()
|
||||||
|
.dataSource(dataSource)
|
||||||
|
.granularity(allGran)
|
||||||
|
.intervals(fullOnInterval)
|
||||||
|
.query("a")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Result<SearchResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<SearchResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new SearchResultValue(
|
||||||
|
Arrays.<SearchHit>asList(
|
||||||
|
new SearchHit(placementishDimension, "a"),
|
||||||
|
new SearchHit(qualityDimension, "automotive"),
|
||||||
|
new SearchHit(placementDimension, "mezzanine"),
|
||||||
|
new SearchHit(providerDimension, "total_market")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeBoundary()
|
||||||
|
{
|
||||||
|
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
|
||||||
|
.dataSource("testing")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Result<TimeBoundaryResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<TimeBoundaryResultValue>(
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
new TimeBoundaryResultValue(
|
||||||
|
ImmutableMap.of(
|
||||||
|
TimeBoundaryQuery.MIN_TIME,
|
||||||
|
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||||
|
TimeBoundaryQuery.MAX_TIME,
|
||||||
|
new DateTime("2011-01-13T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
QueryRunner runner = TestQueryRunners.makeTimeBoundaryQueryRunner(segment);
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
}
|
24
processing/src/test/resources/append.json.1
Normal file
24
processing/src/test/resources/append.json.1
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-14T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"]
|
||||||
|
}
|
||||||
|
]
|
34
processing/src/test/resources/append.json.2
Normal file
34
processing/src/test/resources/append.json.2
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-14T22:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["h", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-14T23:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"quality":"business",
|
||||||
|
"placement":"mezzanine",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T00:00:00.000Z",
|
||||||
|
"placementish":"preferred",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T01:00:00.000Z",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"placement":["q", "mezzanine"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T02:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
23
processing/src/test/resources/append.json.3
Normal file
23
processing/src/test/resources/append.json.3
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-14T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"]
|
||||||
|
}
|
||||||
|
]
|
28
processing/src/test/resources/append.json.4
Normal file
28
processing/src/test/resources/append.json.4
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["h", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T01:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placement":"mezzanine",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T15:00:00.000Z",
|
||||||
|
"placementish":"preferred",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T23:00:00.000Z",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"placement":["q", "mezzanine"],
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
32
processing/src/test/resources/append.json.5
Normal file
32
processing/src/test/resources/append.json.5
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-14T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-16T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-17T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-19T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-21T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
12
processing/src/test/resources/append.json.6
Normal file
12
processing/src/test/resources/append.json.6
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"breakstuff",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-15T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
12
processing/src/test/resources/append.json.7
Normal file
12
processing/src/test/resources/append.json.7
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-18T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-20T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
66
processing/src/test/resources/druid.sample.json
Normal file
66
processing/src/test/resources/druid.sample.json
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placementish":["h", "preferred"],
|
||||||
|
"placement":"preferred",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"quality":"business",
|
||||||
|
"placement":"mezzanine",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placementish":"preferred",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"placement":["q", "mezzanine"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":["", "spot"],
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
49
processing/src/test/resources/druid.sample.json.bottom
Normal file
49
processing/src/test/resources/druid.sample.json.bottom
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["h", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-13T00:00:00.000Z",
|
||||||
|
"provider":"total_market",
|
||||||
|
"quality":"business",
|
||||||
|
"placement":"mezzanine",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placementish":"preferred",
|
||||||
|
"provider":"spot",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placementish":["p", "preferred"],
|
||||||
|
"placement":["q", "mezzanine"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"placement":"preferred",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"",
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":["", "spot"],
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
19
processing/src/test/resources/druid.sample.json.top
Normal file
19
processing/src/test/resources/druid.sample.json.top
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"placement":"preferred",
|
||||||
|
"placementish":["a", "preferred"],
|
||||||
|
"index":100.000000
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"timestamp":"2011-01-12T00:00:00.000Z",
|
||||||
|
"provider":"spot",
|
||||||
|
"quality":"automotive",
|
||||||
|
"index":100.000000
|
||||||
|
}
|
||||||
|
]
|
Loading…
x
Reference in New Issue
Block a user