IngestSegmentFirehoseTest: Add more tests for reindexing. (#4285)

* IngestSegmentFirehoseTest: Add more tests for reindexing.

* Nix unused imports.
This commit is contained in:
Gian Merlino 2017-05-17 14:12:26 +09:00 committed by Fangjin Yang
parent 51872fd310
commit 22f20f2207
1 changed files with 117 additions and 42 deletions

View File

@ -22,13 +22,20 @@ package io.druid.segment.realtime.firehose;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.collections.spatial.search.RadiusBound;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DelimitedParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.NewSpatialDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.filter.SpatialDimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -37,7 +44,11 @@ import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -50,6 +61,33 @@ import java.util.List;
*/
public class IngestSegmentFirehoseTest
{
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("host"),
new NewSpatialDimensionSchema("spatial", ImmutableList.of("x", "y"))
),
null,
null
);
private static final DimensionsSpec DIMENSIONS_SPEC_REINDEX = new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("host"),
new NewSpatialDimensionSchema("spatial", ImmutableList.of("spatial"))
),
null,
null
);
private static final List<AggregatorFactory> AGGREGATORS = ImmutableList.of(
new LongSumAggregatorFactory("visited_sum", "visited"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
);
private static final List<AggregatorFactory> AGGREGATORS_REINDEX = ImmutableList.of(
new LongSumAggregatorFactory("visited_sum", "visited_sum"),
new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
);
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@ -58,80 +96,117 @@ public class IngestSegmentFirehoseTest
private IndexMerger indexMerger = TestHelper.getTestIndexMerger();
@Test
public void testSanity() throws Exception
public void testReadFromIndexAndWriteAnotherIndex() throws Exception
{
// Tests a "reindexing" use case that is a common use of ingestSegment.
File segmentDir = tempFolder.newFolder();
createTestIndex(segmentDir);
QueryableIndex qi = null;
try {
qi = indexIO.loadIndex(segmentDir);
StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
IngestSegmentFirehose firehose = new IngestSegmentFirehose(
try (
final QueryableIndex qi = indexIO.loadIndex(segmentDir);
final IncrementalIndex index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(DIMENSIONS_SPEC_REINDEX)
.withQueryGranularity(Granularities.NONE)
.withMetrics(AGGREGATORS_REINDEX.toArray(new AggregatorFactory[]{}))
.build(),
true,
5000
)
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
final IngestSegmentFirehose firehose = new IngestSegmentFirehose(
ImmutableList.of(wsa, wsa),
ImmutableList.of("host"),
ImmutableList.of("host", "spatial"),
ImmutableList.of("visited_sum", "unique_hosts"),
null
);
int count = 0;
while (firehose.hasMore()) {
firehose.nextRow();
final InputRow row = firehose.nextRow();
Assert.assertNotNull(row);
if (count == 0) {
Assert.assertEquals(new DateTime("2014-10-22T00Z"), row.getTimestamp());
Assert.assertEquals("host1", row.getRaw("host"));
Assert.assertEquals("0,1", row.getRaw("spatial"));
Assert.assertEquals(10L, row.getRaw("visited_sum"));
Assert.assertEquals(1.0d, ((HyperLogLogCollector) row.getRaw("unique_hosts")).estimateCardinality(), 0.1);
}
count++;
index.add(row);
}
Assert.assertEquals(18, count);
}
finally {
if (qi != null) {
qi.close();
}
// Check the index
Assert.assertEquals(9, index.size());
final IncrementalIndexStorageAdapter queryable = new IncrementalIndexStorageAdapter(index);
Assert.assertEquals(2, queryable.getAvailableDimensions().size());
Assert.assertEquals("host", queryable.getAvailableDimensions().get(0));
Assert.assertEquals("spatial", queryable.getAvailableDimensions().get(1));
Assert.assertEquals(ImmutableList.of("visited_sum", "unique_hosts"), queryable.getAvailableMetrics());
// Do a spatial filter
final IngestSegmentFirehose firehose2 = new IngestSegmentFirehose(
ImmutableList.of(new WindowedStorageAdapter(queryable, new Interval("2000/3000"))),
ImmutableList.of("host", "spatial"),
ImmutableList.of("visited_sum", "unique_hosts"),
new SpatialDimFilter("spatial", new RadiusBound(new float[]{1, 0}, 0.1f))
);
final InputRow row = firehose2.nextRow();
Assert.assertFalse(firehose2.hasMore());
Assert.assertEquals(new DateTime("2014-10-22T00Z"), row.getTimestamp());
Assert.assertEquals("host2", row.getRaw("host"));
Assert.assertEquals("1,0", row.getRaw("spatial"));
Assert.assertEquals(40L, row.getRaw("visited_sum"));
Assert.assertEquals(1.0d, ((HyperLogLogCollector) row.getRaw("unique_hosts")).estimateCardinality(), 0.1);
}
}
private void createTestIndex(File segmentDir) throws Exception
{
List<String> rows = Lists.newArrayList(
"2014102200,host1,10",
"2014102200,host2,20",
"2014102200,host3,30",
"2014102201,host1,10",
"2014102201,host2,20",
"2014102201,host3,30",
"2014102202,host1,10",
"2014102202,host2,20",
"2014102202,host3,30"
final List<String> rows = Lists.newArrayList(
"2014102200\thost1\t10\t0\t1",
"2014102200\thost2\t20\t1\t0",
"2014102200\thost3\t30\t1\t1",
"2014102201\thost1\t10\t1\t1",
"2014102201\thost2\t20\t1\t1",
"2014102201\thost3\t30\t1\t1",
"2014102202\thost1\t10\t1\t1",
"2014102202\thost2\t20\t1\t1",
"2014102202\thost3\t30\t1\t1"
);
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
final StringInputRowParser parser = new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
DIMENSIONS_SPEC,
"\t",
null,
ImmutableList.of("timestamp", "host", "visited"),
ImmutableList.of("timestamp", "host", "visited", "x", "y", "spatial"),
false,
0
),
Charsets.UTF_8.toString()
);
AggregatorFactory[] aggregators = new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
};
IncrementalIndex index = null;
try {
index = new OnheapIncrementalIndex(0, Granularities.NONE, aggregators, true, true, true, 5000);
try (
final IncrementalIndex index = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(parser.getParseSpec().getDimensionsSpec())
.withQueryGranularity(Granularities.NONE)
.withMetrics(AGGREGATORS.toArray(new AggregatorFactory[]{}))
.build(),
true,
5000
)
) {
for (String line : rows) {
index.add(parser.parse(line));
}
indexMerger.persist(index, segmentDir, new IndexSpec());
}
finally {
if (index != null) {
index.close();
}
}
}
}