mirror of https://github.com/apache/druid.git
Add more IndexMergerTests
This commit is contained in:
parent
2fb2b6ce6b
commit
55292bba13
|
@ -65,6 +65,7 @@ public class IndexIOTest
|
|||
private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01");
|
||||
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
|
||||
new ConciseBitmapSerdeFactory(),
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4
|
||||
);
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ public class IndexMakerTest
|
|||
};
|
||||
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
|
||||
new ConciseBitmapSerdeFactory(),
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4
|
||||
);
|
||||
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");
|
||||
|
|
|
@ -17,59 +17,86 @@
|
|||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||
import io.druid.segment.data.IncrementalIndexTest;
|
||||
import io.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class IndexMergerTest
|
||||
{
|
||||
@Parameterized.Parameters(name = "{index}: bitmap={0}, compression={1}")
|
||||
@Rule
|
||||
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}")
|
||||
public static Collection<Object[]> data()
|
||||
{
|
||||
return Arrays.asList(
|
||||
new Object[][]{
|
||||
{ null, null },
|
||||
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 },
|
||||
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 },
|
||||
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF},
|
||||
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF},
|
||||
return Collections2.transform(
|
||||
Sets.cartesianProduct(
|
||||
ImmutableList.of(
|
||||
ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()),
|
||||
ImmutableSet.of(
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZF
|
||||
),
|
||||
ImmutableSet.of(
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZF
|
||||
)
|
||||
)
|
||||
), new Function<List<?>, Object[]>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Object[] apply(List<?> input)
|
||||
{
|
||||
return input.toArray();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
static IndexSpec makeIndexSpec(
|
||||
BitmapSerdeFactory bitmapSerdeFactory,
|
||||
CompressedObjectStrategy.CompressionStrategy compressionStrategy
|
||||
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
|
||||
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
|
||||
)
|
||||
{
|
||||
if (bitmapSerdeFactory != null || compressionStrategy != null) {
|
||||
return new IndexSpec(
|
||||
bitmapSerdeFactory,
|
||||
compressionStrategy.name().toLowerCase(),
|
||||
null
|
||||
dimCompressionStrategy.name().toLowerCase()
|
||||
);
|
||||
} else {
|
||||
return new IndexSpec();
|
||||
|
@ -78,9 +105,13 @@ public class IndexMergerTest
|
|||
|
||||
private final IndexSpec indexSpec;
|
||||
|
||||
public IndexMergerTest(BitmapSerdeFactory bitmapSerdeFactory, CompressedObjectStrategy.CompressionStrategy compressionStrategy)
|
||||
public IndexMergerTest(
|
||||
BitmapSerdeFactory bitmapSerdeFactory,
|
||||
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
|
||||
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
|
||||
)
|
||||
{
|
||||
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy);
|
||||
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -91,17 +122,14 @@ public class IndexMergerTest
|
|||
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist);
|
||||
|
||||
final File tempDir = Files.createTempDir();
|
||||
try {
|
||||
final File tempDir = temporaryFolder.newFolder();
|
||||
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec));
|
||||
|
||||
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index.getColumnNames().size());
|
||||
}
|
||||
finally {
|
||||
tempDir.delete();
|
||||
}
|
||||
|
||||
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -111,7 +139,12 @@ public class IndexMergerTest
|
|||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000);
|
||||
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
1000
|
||||
);
|
||||
|
||||
toPersist2.add(
|
||||
new MapBasedInputRow(
|
||||
|
@ -129,10 +162,10 @@ public class IndexMergerTest
|
|||
)
|
||||
);
|
||||
|
||||
final File tempDir1 = Files.createTempDir();
|
||||
final File tempDir2 = Files.createTempDir();
|
||||
final File mergedDir = Files.createTempDir();
|
||||
try {
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File tempDir2 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -157,24 +190,30 @@ public class IndexMergerTest
|
|||
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteQuietly(tempDir1);
|
||||
FileUtils.deleteQuietly(tempDir2);
|
||||
FileUtils.deleteQuietly(mergedDir);
|
||||
}
|
||||
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistEmptyColumn() throws Exception
|
||||
{
|
||||
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10);
|
||||
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}, 10);
|
||||
final File tmpDir1 = Files.createTempDir();
|
||||
final File tmpDir2 = Files.createTempDir();
|
||||
final File tmpDir3 = Files.createTempDir();
|
||||
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{},
|
||||
10
|
||||
);
|
||||
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{},
|
||||
10
|
||||
);
|
||||
final File tmpDir1 = temporaryFolder.newFolder();
|
||||
final File tmpDir2 = temporaryFolder.newFolder();
|
||||
final File tmpDir3 = temporaryFolder.newFolder();
|
||||
|
||||
try {
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1L,
|
||||
|
@ -205,11 +244,177 @@ public class IndexMergerTest
|
|||
|
||||
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
finally {
|
||||
FileUtils.deleteQuietly(tmpDir1);
|
||||
FileUtils.deleteQuietly(tmpDir2);
|
||||
FileUtils.deleteQuietly(tmpDir3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeRetainsValues() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAppendRetainsValues() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(
|
||||
IndexMerger.append(
|
||||
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
|
||||
)
|
||||
);
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeSpecChange() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
IndexSpec newSpec = new IndexSpec(indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4");
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
newSpec
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, newSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) throws Exception
|
||||
{
|
||||
// Java voodoo
|
||||
|
||||
Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
|
||||
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
|
||||
field.setAccessible(true);
|
||||
|
||||
Object obj = field.get(encodedColumn);
|
||||
Field compressedSupplierField = obj.getClass().getDeclaredField("this$0");
|
||||
compressedSupplierField.setAccessible(true);
|
||||
|
||||
Object supplier = compressedSupplierField.get(obj);
|
||||
Field compressionField = supplier.getClass().getDeclaredField("compression");
|
||||
compressionField.setAccessible(true);
|
||||
|
||||
Object strategy = compressionField.get(supplier);
|
||||
|
||||
Assert.assertEquals(expectedStrategy, strategy);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue