Allow IndexMerger to use non-lexicographic dim order when merging indexes

This commit is contained in:
jon-wei 2015-11-16 12:38:56 -08:00
parent acc1a215c7
commit 4afc62be29
2 changed files with 243 additions and 37 deletions

View File

@ -254,6 +254,38 @@ public class IndexMerger
return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
} }
private List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes)
{
return mergeIndexed(
Lists.transform(
indexes,
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return input.getDimensionNames();
}
}
)
);
}
private List<String> getMergedDimensions(List<IndexableAdapter> indexes)
{
if (indexes.size() == 0) {
return ImmutableList.of();
}
Indexed<String> dimOrder = indexes.get(0).getDimensionNames();
for (IndexableAdapter index : indexes) {
Indexed<String> dimOrder2 = index.getDimensionNames();
if(!Iterators.elementsEqual(dimOrder.iterator(), dimOrder2.iterator())) {
return getLexicographicMergedDimensions(indexes);
}
}
return ImmutableList.copyOf(dimOrder);
}
public File merge( public File merge(
List<IndexableAdapter> indexes, List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs, final AggregatorFactory[] metricAggs,
@ -268,19 +300,8 @@ public class IndexMerger
throw new ISE("Couldn't make outdir[%s].", outDir); throw new ISE("Couldn't make outdir[%s].", outDir);
} }
final List<String> mergedDimensions = mergeIndexed( final List<String> mergedDimensions = getMergedDimensions(indexes);
Lists.transform(
indexes,
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return input.getDimensionNames();
}
}
)
);
final List<String> mergedMetrics = Lists.transform( final List<String> mergedMetrics = Lists.transform(
mergeIndexed( mergeIndexed(
Lists.newArrayList( Lists.newArrayList(
@ -408,29 +429,8 @@ public class IndexMerger
throw new ISE("Couldn't make outdir[%s].", outDir); throw new ISE("Couldn't make outdir[%s].", outDir);
} }
final List<String> mergedDimensions = mergeIndexed( final List<String> mergedDimensions = getMergedDimensions(indexes);
Lists.transform(
indexes,
new Function<IndexableAdapter, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable IndexableAdapter input)
{
return Iterables.transform(
input.getDimensionNames(),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return input;
}
}
);
}
}
)
);
final List<String> mergedMetrics = mergeIndexed( final List<String> mergedMetrics = mergeIndexed(
Lists.transform( Lists.transform(
indexes, indexes,

View File

@ -51,6 +51,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -631,7 +632,15 @@ public class IndexMergerTest
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
); );
QueryableIndex converted = closer.closeLater(INDEX_IO.loadIndex(INDEX_MERGER.convert(tempDir1, convertDir, newSpec))); QueryableIndex converted = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.convert(
tempDir1,
convertDir,
newSpec
)
)
);
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
@ -664,4 +673,201 @@ public class IndexMergerTest
Assert.assertEquals(expectedStrategy, strategy); Assert.assertEquals(expectedStrategy, strategy);
} }
@Test
public void testNonLexicographicDimOrderMerge() throws Exception
{
IncrementalIndex toPersist1 = getIndexD3();
IncrementalIndex toPersist2 = getIndexD3();
IncrementalIndex toPersist3 = getIndexD3();
final File tmpDir = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist1,
tmpDir,
null,
indexSpec
)
)
);
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
null,
indexSpec
)
)
);
QueryableIndex index3 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist3,
tmpDir3,
null,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
}
@Test
public void testDisjointDimMerge() throws Exception
{
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
null,
indexSpec
)
)
);
QueryableIndex indexB = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
null,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics());
}
private IncrementalIndex getIndexD3() throws Exception
{
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
toPersist1.add(
new MapBasedInputRow(
1,
Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "100", "d2", "4000", "d3", "30000")
)
);
toPersist1.add(
new MapBasedInputRow(
1,
Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "200", "d2", "3000", "d3", "50000")
)
);
toPersist1.add(
new MapBasedInputRow(
1,
Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "300", "d2", "2000", "d3", "40000")
)
);
return toPersist1;
}
private IncrementalIndex getSingleDimIndex(String dimName, List<String> values) throws Exception
{
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
for (String val : values) {
toPersist1.add(
new MapBasedInputRow(
1,
Arrays.asList(dimName),
ImmutableMap.<String, Object>of(dimName, val)
)
);
}
return toPersist1;
}
} }