From 4afc62be2964f31d593a7c99fc4feaf8f8478db2 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 16 Nov 2015 12:38:56 -0800 Subject: [PATCH] Allow IndexMerger to use non-lexicographic dim order when merging indexes --- .../java/io/druid/segment/IndexMerger.java | 72 +++--- .../io/druid/segment/IndexMergerTest.java | 208 +++++++++++++++++- 2 files changed, 243 insertions(+), 37 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 44b3bfe4b83..d8bc9d1a6d8 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -254,6 +254,38 @@ public class IndexMerger return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); } + private List getLexicographicMergedDimensions(List indexes) + { + return mergeIndexed( + Lists.transform( + indexes, + new Function>() + { + @Override + public Iterable apply(@Nullable IndexableAdapter input) + { + return input.getDimensionNames(); + } + } + ) + ); + } + + private List getMergedDimensions(List indexes) + { + if (indexes.size() == 0) { + return ImmutableList.of(); + } + Indexed dimOrder = indexes.get(0).getDimensionNames(); + for (IndexableAdapter index : indexes) { + Indexed dimOrder2 = index.getDimensionNames(); + if(!Iterators.elementsEqual(dimOrder.iterator(), dimOrder2.iterator())) { + return getLexicographicMergedDimensions(indexes); + } + } + return ImmutableList.copyOf(dimOrder); + } + public File merge( List indexes, final AggregatorFactory[] metricAggs, @@ -268,19 +300,8 @@ public class IndexMerger throw new ISE("Couldn't make outdir[%s].", outDir); } - final List mergedDimensions = mergeIndexed( - Lists.transform( - indexes, - new Function>() - { - @Override - public Iterable apply(@Nullable IndexableAdapter input) - { - return input.getDimensionNames(); - } - } - ) - ); + final List mergedDimensions = getMergedDimensions(indexes); + final List mergedMetrics = Lists.transform( mergeIndexed( Lists.newArrayList( @@ -408,29 +429,8 @@ public class IndexMerger throw new ISE("Couldn't make outdir[%s].", outDir); } - final List mergedDimensions = mergeIndexed( - Lists.transform( - indexes, - new Function>() - { - @Override - public Iterable apply(@Nullable IndexableAdapter input) - { - return Iterables.transform( - input.getDimensionNames(), - new Function() - { - @Override - public String apply(@Nullable String input) - { - return input; - } - } - ); - } - } - ) - ); + final List mergedDimensions = getMergedDimensions(indexes); + final List mergedMetrics = mergeIndexed( Lists.transform( indexes, diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index a33fa7656bc..edf540aec71 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -51,6 +51,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.File; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -631,7 +632,15 @@ public class IndexMergerTest "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" ); - QueryableIndex converted = closer.closeLater(INDEX_IO.loadIndex(INDEX_MERGER.convert(tempDir1, convertDir, newSpec))); + QueryableIndex converted = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.convert( + tempDir1, + convertDir, + newSpec + ) + ) + ); Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); @@ -664,4 +673,201 @@ public class IndexMergerTest Assert.assertEquals(expectedStrategy, strategy); } + + + @Test + public void testNonLexicographicDimOrderMerge() throws Exception + { + IncrementalIndex toPersist1 = getIndexD3(); + IncrementalIndex toPersist2 = getIndexD3(); + IncrementalIndex toPersist3 = getIndexD3(); + + final File tmpDir = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex index1 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist1, + tmpDir, + null, + indexSpec + ) + ) + ); + + QueryableIndex index2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist2, + tmpDir2, + null, + indexSpec + ) + ) + ); + + QueryableIndex index3 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist3, + tmpDir3, + null, + indexSpec + ) + ) + ); + + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(index1, index2, index3), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = new ArrayList<>(); + for (Rowboat boat : boats) { + boatList.add(boat); + } + + Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(3, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + } + + @Test + public void testDisjointDimMerge() throws Exception + { + IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); + IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistA, + tmpDirA, + null, + indexSpec + ) + ) + ); + + QueryableIndex indexB = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB, + tmpDirB, + null, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = new ArrayList<>(); + for (Rowboat boat : boats) { + boatList.add(boat); + } + + Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(5, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics()); + } + + private IncrementalIndex getIndexD3() throws Exception + { + IncrementalIndex toPersist1 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); + + toPersist1.add( + new MapBasedInputRow( + 1, + Arrays.asList("d3", "d1", "d2"), + ImmutableMap.of("d1", "100", "d2", "4000", "d3", "30000") + ) + ); + + toPersist1.add( + new MapBasedInputRow( + 1, + Arrays.asList("d3", "d1", "d2"), + ImmutableMap.of("d1", "200", "d2", "3000", "d3", "50000") + ) + ); + + toPersist1.add( + new MapBasedInputRow( + 1, + Arrays.asList("d3", "d1", "d2"), + ImmutableMap.of("d1", "300", "d2", "2000", "d3", "40000") + ) + ); + + return toPersist1; + } + + private IncrementalIndex getSingleDimIndex(String dimName, List values) throws Exception + { + IncrementalIndex toPersist1 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); + + for (String val : values) { + toPersist1.add( + new MapBasedInputRow( + 1, + Arrays.asList(dimName), + ImmutableMap.of(dimName, val) + ) + ); + } + + return toPersist1; + } }