Merge pull request #2324 from himanshug/ut

unit test: adding single threaded indexing and querying test for IncrementalIndex
This commit is contained in:
Fangjin Yang 2016-01-23 15:25:41 -08:00
commit fc32e34e1e
1 changed files with 107 additions and 0 deletions

View File

@ -22,6 +22,7 @@ package io.druid.segment.data;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -221,6 +222,106 @@ public class IncrementalIndexTest
Assert.assertEquals(timestamp, row.getTimestampFromEpoch()); Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1")); Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2")); Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
index.close();
}
@Test
public void testSingleThreadedIndexingAndQuery() throws Exception
{
final int dimensionCount = 5;
final ArrayList<AggregatorFactory> ingestAggregatorFactories = new ArrayList<>();
ingestAggregatorFactories.add(new CountAggregatorFactory("rows"));
for (int i = 0; i < dimensionCount; ++i) {
ingestAggregatorFactories.add(
new LongSumAggregatorFactory(
String.format("sumResult%s", i),
String.format("Dim_%s", i)
)
);
ingestAggregatorFactories.add(
new DoubleSumAggregatorFactory(
String.format("doubleSumResult%s", i),
String.format("Dim_%s", i)
)
);
}
final IncrementalIndex index = indexCreator.createIndex(
ingestAggregatorFactories.toArray(
new AggregatorFactory[ingestAggregatorFactories.size()]
)
);
final long timestamp = System.currentTimeMillis();
final int rows = 50;
//ingesting same data twice to have some merging happening
for (int i = 0; i < rows; i++) {
index.add(getLongRow(timestamp + i, i, dimensionCount));
}
for (int i = 0; i < rows; i++) {
index.add(getLongRow(timestamp + i, i, dimensionCount));
}
//run a timeseries query on the index and verify results
final ArrayList<AggregatorFactory> queryAggregatorFactories = new ArrayList<>();
queryAggregatorFactories.add(new CountAggregatorFactory("rows"));
for (int i = 0; i < dimensionCount; ++i) {
queryAggregatorFactories.add(
new LongSumAggregatorFactory(
String.format("sumResult%s", i),
String.format("sumResult%s", i)
)
);
queryAggregatorFactories.add(
new DoubleSumAggregatorFactory(
String.format("doubleSumResult%s", i),
String.format("doubleSumResult%s", i)
)
);
}
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("xxx")
.granularity(QueryGranularity.ALL)
.intervals(ImmutableList.of(new Interval("2000/2030")))
.aggregators(queryAggregatorFactories)
.build();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final QueryRunner<Result<TimeseriesResultValue>> runner = new FinalizeResultsQueryRunner<Result<TimeseriesResultValue>>(
factory.createRunner(incrementalIndexSegment),
factory.getToolchest()
);
List<Result<TimeseriesResultValue>> results = Sequences.toList(
runner.run(query, new HashMap<String, Object>()),
new LinkedList<Result<TimeseriesResultValue>>()
);
Result<TimeseriesResultValue> result = Iterables.getOnlyElement(results);
Assert.assertEquals(rows, result.getValue().getLongMetric("rows").intValue());
for (int i = 0; i < dimensionCount; ++i) {
Assert.assertEquals(
String.format("Failed long sum on dimension %d", i),
2*rows,
result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue()
);
Assert.assertEquals(
String.format("Failed double sum on dimension %d", i),
2*rows,
result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue()
);
}
index.close();
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
@ -436,6 +537,8 @@ public class IncrementalIndexTest
); );
} }
} }
index.close();
} }
@Test @Test
@ -481,6 +584,8 @@ public class IncrementalIndexTest
curr++; curr++;
} }
Assert.assertEquals(elementsPerThread, curr); Assert.assertEquals(elementsPerThread, curr);
index.close();
} }
@Test @Test
@ -507,5 +612,7 @@ public class IncrementalIndexTest
1000000 1000000
); );
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
incrementalIndex.close();
} }
} }