From 55292bba13832ab8146d951264c0a0d2cd38bca9 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 28 May 2015 18:18:20 -0700 Subject: [PATCH] Add more IndexMergerTests --- .../java/io/druid/segment/IndexIOTest.java | 1 + .../java/io/druid/segment/IndexMakerTest.java | 1 + .../io/druid/segment/IndexMergerTest.java | 393 +++++++++++++----- 3 files changed, 301 insertions(+), 94 deletions(-) diff --git a/processing/src/test/java/io/druid/segment/IndexIOTest.java b/processing/src/test/java/io/druid/segment/IndexIOTest.java index 8975b039bb3..91b111306ba 100644 --- a/processing/src/test/java/io/druid/segment/IndexIOTest.java +++ b/processing/src/test/java/io/druid/segment/IndexIOTest.java @@ -65,6 +65,7 @@ public class IndexIOTest private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01"); private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( new ConciseBitmapSerdeFactory(), + CompressedObjectStrategy.CompressionStrategy.LZ4, CompressedObjectStrategy.CompressionStrategy.LZ4 ); diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMakerTest.java index d7ab4784102..ba131d92132 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerTest.java @@ -66,6 +66,7 @@ public class IndexMakerTest }; private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( new ConciseBitmapSerdeFactory(), + CompressedObjectStrategy.CompressionStrategy.LZ4, CompressedObjectStrategy.CompressionStrategy.LZ4 ); private static final List DIMS = ImmutableList.of("dim0", "dim1"); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index e77d027c9ac..07a5b0e4692 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -17,59 +17,86 @@ package io.druid.segment; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.io.Files; +import com.google.common.collect.Sets; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.column.Column; +import io.druid.segment.column.SimpleDictionaryEncodedColumn; import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.OnheapIncrementalIndex; -import junit.framework.Assert; -import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.File; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collection; +import java.util.List; @RunWith(Parameterized.class) public class IndexMergerTest { - @Parameterized.Parameters(name = "{index}: bitmap={0}, compression={1}") + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}") public static Collection data() { - return Arrays.asList( - new Object[][]{ - { null, null }, - { new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 }, - { new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 }, - { new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF}, - { new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF}, + return Collections2.transform( + Sets.cartesianProduct( + ImmutableList.of( + ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()), + ImmutableSet.of( + CompressedObjectStrategy.CompressionStrategy.LZ4, + CompressedObjectStrategy.CompressionStrategy.LZF + ), + ImmutableSet.of( + CompressedObjectStrategy.CompressionStrategy.LZ4, + CompressedObjectStrategy.CompressionStrategy.LZF + ) + ) + ), new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } } ); } static IndexSpec makeIndexSpec( BitmapSerdeFactory bitmapSerdeFactory, - CompressedObjectStrategy.CompressionStrategy compressionStrategy + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy ) { - if(bitmapSerdeFactory != null || compressionStrategy != null) { + if (bitmapSerdeFactory != null || compressionStrategy != null) { return new IndexSpec( bitmapSerdeFactory, compressionStrategy.name().toLowerCase(), - null + dimCompressionStrategy.name().toLowerCase() ); } else { return new IndexSpec(); @@ -78,9 +105,13 @@ public class IndexMergerTest private final IndexSpec indexSpec; - public IndexMergerTest(BitmapSerdeFactory bitmapSerdeFactory, CompressedObjectStrategy.CompressionStrategy compressionStrategy) + public IndexMergerTest( + BitmapSerdeFactory bitmapSerdeFactory, + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy + ) { - this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy); + this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy); } @Test @@ -91,17 +122,14 @@ public class IndexMergerTest IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); IncrementalIndexTest.populateIndex(timestamp, toPersist); - final File tempDir = Files.createTempDir(); - try { - QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)); + final File tempDir = temporaryFolder.newFolder(); + QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)); - Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); - Assert.assertEquals(3, index.getColumnNames().size()); - } - finally { - tempDir.delete(); - } + Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); + Assert.assertEquals(3, index.getColumnNames().size()); + + assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); } @Test @@ -111,7 +139,12 @@ public class IndexMergerTest IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); - IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); toPersist2.add( new MapBasedInputRow( @@ -129,87 +162,259 @@ public class IndexMergerTest ) ); - final File tempDir1 = Files.createTempDir(); - final File tempDir2 = Files.createTempDir(); - final File mergedDir = Files.createTempDir(); - try { - QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + final File tempDir1 = temporaryFolder.newFolder(); + final File tempDir2 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); + QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); - QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec)); + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); - Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); - Assert.assertEquals(3, index2.getColumnNames().size()); + QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec)); - QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex( - Arrays.asList(index1, index2), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec - ) - ); + Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); + Assert.assertEquals(3, index2.getColumnNames().size()); - Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - } - finally { - FileUtils.deleteQuietly(tempDir1); - FileUtils.deleteQuietly(tempDir2); - FileUtils.deleteQuietly(mergedDir); - } -} + QueryableIndex merged = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + Arrays.asList(index1, index2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ); + + Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } @Test public void testPersistEmptyColumn() throws Exception { - final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10); - final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10); - final File tmpDir1 = Files.createTempDir(); - final File tmpDir2 = Files.createTempDir(); - final File tmpDir3 = Files.createTempDir(); + final IncrementalIndex toPersist1 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{}, + 10 + ); + final IncrementalIndex toPersist2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{}, + 10 + ); + final File tmpDir1 = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); - try { - toPersist1.add( - new MapBasedInputRow( - 1L, - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "foo") - ) - ); + toPersist1.add( + new MapBasedInputRow( + 1L, + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "foo") + ) + ); - toPersist2.add( - new MapBasedInputRow( - 1L, - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "bar") - ) - ); + toPersist2.add( + new MapBasedInputRow( + 1L, + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "bar") + ) + ); - final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1, indexSpec)); - final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2, indexSpec)); - final QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec) - ); + final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1, indexSpec)); + final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2, indexSpec)); + final QueryableIndex merged = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec) + ); - Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions())); + Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions())); - Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); + Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); - Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); - } - finally { - FileUtils.deleteQuietly(tmpDir1); - FileUtils.deleteQuietly(tmpDir2); - FileUtils.deleteQuietly(tmpDir3); - } + Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testMergeRetainsValues() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + QueryableIndex merged = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + + @Test + public void testAppendRetainsValues() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = IndexIO.loadIndex( + IndexMerger.append( + ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + ) + ); + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + QueryableIndex merged = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testMergeSpecChange() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + IndexSpec newSpec = new IndexSpec(indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"); + + + QueryableIndex merged = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + newSpec + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, newSpec.getDimensionCompressionStrategy()); + } + + private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) throws Exception + { + // Java voodoo + + Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding(); + Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column"); + field.setAccessible(true); + + Object obj = field.get(encodedColumn); + Field compressedSupplierField = obj.getClass().getDeclaredField("this$0"); + compressedSupplierField.setAccessible(true); + + Object supplier = compressedSupplierField.get(obj); + Field compressionField = supplier.getClass().getDeclaredField("compression"); + compressionField.setAccessible(true); + + Object strategy = compressionField.get(supplier); + + Assert.assertEquals(expectedStrategy, strategy); } }