Merge pull request #1402 from metamx/IndexMergerTests

Add more IndexMergerTests
This commit is contained in:
Fangjin Yang 2015-05-28 21:15:18 -07:00
commit 069fc25528
3 changed files with 301 additions and 94 deletions

View File

@ -65,6 +65,7 @@ public class IndexIOTest
private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01"); private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01");
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(), new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZ4 CompressedObjectStrategy.CompressionStrategy.LZ4
); );

View File

@ -66,6 +66,7 @@ public class IndexMakerTest
}; };
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(), new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZ4 CompressedObjectStrategy.CompressionStrategy.LZ4
); );
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1"); private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");

View File

@ -17,59 +17,86 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files; import com.google.common.collect.Sets;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.Column; import io.druid.segment.column.Column;
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert; import org.junit.Assert;
import org.apache.commons.io.FileUtils; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.lang.reflect.Field;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class IndexMergerTest public class IndexMergerTest
{ {
@Parameterized.Parameters(name = "{index}: bitmap={0}, compression={1}") @Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}")
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
return Arrays.asList( return Collections2.transform(
new Object[][]{ Sets.cartesianProduct(
{ null, null }, ImmutableList.of(
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 }, ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()),
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 }, ImmutableSet.of(
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF}, CompressedObjectStrategy.CompressionStrategy.LZ4,
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF}, CompressedObjectStrategy.CompressionStrategy.LZF
),
ImmutableSet.of(
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZF
)
)
), new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
} }
); );
} }
static IndexSpec makeIndexSpec( static IndexSpec makeIndexSpec(
BitmapSerdeFactory bitmapSerdeFactory, BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy CompressedObjectStrategy.CompressionStrategy compressionStrategy,
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
) )
{ {
if (bitmapSerdeFactory != null || compressionStrategy != null) { if (bitmapSerdeFactory != null || compressionStrategy != null) {
return new IndexSpec( return new IndexSpec(
bitmapSerdeFactory, bitmapSerdeFactory,
compressionStrategy.name().toLowerCase(), compressionStrategy.name().toLowerCase(),
null dimCompressionStrategy.name().toLowerCase()
); );
} else { } else {
return new IndexSpec(); return new IndexSpec();
@ -78,9 +105,13 @@ public class IndexMergerTest
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
public IndexMergerTest(BitmapSerdeFactory bitmapSerdeFactory, CompressedObjectStrategy.CompressionStrategy compressionStrategy) public IndexMergerTest(
BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
)
{ {
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy); this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy);
} }
@Test @Test
@ -91,17 +122,14 @@ public class IndexMergerTest
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist); IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = Files.createTempDir(); final File tempDir = temporaryFolder.newFolder();
try {
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)); QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec));
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(3, index.getColumnNames().size()); Assert.assertEquals(3, index.getColumnNames().size());
}
finally { assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
tempDir.delete();
}
} }
@Test @Test
@ -111,7 +139,12 @@ public class IndexMergerTest
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000); IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
toPersist2.add( toPersist2.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -129,10 +162,10 @@ public class IndexMergerTest
) )
); );
final File tempDir1 = Files.createTempDir(); final File tempDir1 = temporaryFolder.newFolder();
final File tempDir2 = Files.createTempDir(); final File tempDir2 = temporaryFolder.newFolder();
final File mergedDir = Files.createTempDir(); final File mergedDir = temporaryFolder.newFolder();
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
@ -157,24 +190,30 @@ public class IndexMergerTest
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size()); Assert.assertEquals(3, merged.getColumnNames().size());
} assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
finally { assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
FileUtils.deleteQuietly(tempDir1); assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
FileUtils.deleteQuietly(tempDir2);
FileUtils.deleteQuietly(mergedDir);
}
} }
@Test @Test
public void testPersistEmptyColumn() throws Exception public void testPersistEmptyColumn() throws Exception
{ {
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10); final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10); 0L,
final File tmpDir1 = Files.createTempDir(); QueryGranularity.NONE,
final File tmpDir2 = Files.createTempDir(); new AggregatorFactory[]{},
final File tmpDir3 = Files.createTempDir(); 10
);
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{},
10
);
final File tmpDir1 = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
try {
toPersist1.add( toPersist1.add(
new MapBasedInputRow( new MapBasedInputRow(
1L, 1L,
@ -205,11 +244,177 @@ public class IndexMergerTest
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
} }
finally {
FileUtils.deleteQuietly(tmpDir1); @Test
FileUtils.deleteQuietly(tmpDir2); public void testMergeRetainsValues() throws Exception
FileUtils.deleteQuietly(tmpDir3); {
} final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testAppendRetainsValues() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = IndexIO.loadIndex(
IndexMerger.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testMergeSpecChange() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
IndexSpec newSpec = new IndexSpec(indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4");
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
newSpec
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, newSpec.getDimensionCompressionStrategy());
}
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) throws Exception
{
// Java voodoo
Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
field.setAccessible(true);
Object obj = field.get(encodedColumn);
Field compressedSupplierField = obj.getClass().getDeclaredField("this$0");
compressedSupplierField.setAccessible(true);
Object supplier = compressedSupplierField.get(obj);
Field compressionField = supplier.getClass().getDeclaredField("compression");
compressionField.setAccessible(true);
Object strategy = compressionField.get(supplier);
Assert.assertEquals(expectedStrategy, strategy);
} }
} }