mirror of https://github.com/apache/druid.git
Merge pull request #1974 from jon-wei/dim_order_merge
Allow IndexMerger to use non-lexicographic dim order when merging indexes
This commit is contained in:
commit
d93640bfcb
|
@ -254,6 +254,38 @@ public class IndexMerger
|
|||
return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
private List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes)
|
||||
{
|
||||
return mergeIndexed(
|
||||
Lists.transform(
|
||||
indexes,
|
||||
new Function<IndexableAdapter, Iterable<String>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<String> apply(@Nullable IndexableAdapter input)
|
||||
{
|
||||
return input.getDimensionNames();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private List<String> getMergedDimensions(List<IndexableAdapter> indexes)
|
||||
{
|
||||
if (indexes.size() == 0) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
Indexed<String> dimOrder = indexes.get(0).getDimensionNames();
|
||||
for (IndexableAdapter index : indexes) {
|
||||
Indexed<String> dimOrder2 = index.getDimensionNames();
|
||||
if(!Iterators.elementsEqual(dimOrder.iterator(), dimOrder2.iterator())) {
|
||||
return getLexicographicMergedDimensions(indexes);
|
||||
}
|
||||
}
|
||||
return ImmutableList.copyOf(dimOrder);
|
||||
}
|
||||
|
||||
public File merge(
|
||||
List<IndexableAdapter> indexes,
|
||||
final AggregatorFactory[] metricAggs,
|
||||
|
@ -268,19 +300,8 @@ public class IndexMerger
|
|||
throw new ISE("Couldn't make outdir[%s].", outDir);
|
||||
}
|
||||
|
||||
final List<String> mergedDimensions = mergeIndexed(
|
||||
Lists.transform(
|
||||
indexes,
|
||||
new Function<IndexableAdapter, Iterable<String>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<String> apply(@Nullable IndexableAdapter input)
|
||||
{
|
||||
return input.getDimensionNames();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
final List<String> mergedDimensions = getMergedDimensions(indexes);
|
||||
|
||||
final List<String> mergedMetrics = Lists.transform(
|
||||
mergeIndexed(
|
||||
Lists.newArrayList(
|
||||
|
@ -408,29 +429,8 @@ public class IndexMerger
|
|||
throw new ISE("Couldn't make outdir[%s].", outDir);
|
||||
}
|
||||
|
||||
final List<String> mergedDimensions = mergeIndexed(
|
||||
Lists.transform(
|
||||
indexes,
|
||||
new Function<IndexableAdapter, Iterable<String>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<String> apply(@Nullable IndexableAdapter input)
|
||||
{
|
||||
return Iterables.transform(
|
||||
input.getDimensionNames(),
|
||||
new Function<String, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable String input)
|
||||
{
|
||||
return input;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
final List<String> mergedDimensions = getMergedDimensions(indexes);
|
||||
|
||||
final List<String> mergedMetrics = mergeIndexed(
|
||||
Lists.transform(
|
||||
indexes,
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.junit.runners.Parameterized;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -631,7 +632,15 @@ public class IndexMergerTest
|
|||
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
|
||||
);
|
||||
|
||||
QueryableIndex converted = closer.closeLater(INDEX_IO.loadIndex(INDEX_MERGER.convert(tempDir1, convertDir, newSpec)));
|
||||
QueryableIndex converted = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.convert(
|
||||
tempDir1,
|
||||
convertDir,
|
||||
newSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
|
||||
|
@ -664,4 +673,201 @@ public class IndexMergerTest
|
|||
|
||||
Assert.assertEquals(expectedStrategy, strategy);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNonLexicographicDimOrderMerge() throws Exception
|
||||
{
|
||||
IncrementalIndex toPersist1 = getIndexD3();
|
||||
IncrementalIndex toPersist2 = getIndexD3();
|
||||
IncrementalIndex toPersist3 = getIndexD3();
|
||||
|
||||
final File tmpDir = temporaryFolder.newFolder();
|
||||
final File tmpDir2 = temporaryFolder.newFolder();
|
||||
final File tmpDir3 = temporaryFolder.newFolder();
|
||||
final File tmpDirMerged = temporaryFolder.newFolder();
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.persist(
|
||||
toPersist1,
|
||||
tmpDir,
|
||||
null,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
QueryableIndex index2 = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.persist(
|
||||
toPersist2,
|
||||
tmpDir2,
|
||||
null,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
QueryableIndex index3 = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.persist(
|
||||
toPersist3,
|
||||
tmpDir3,
|
||||
null,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
final QueryableIndex merged = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.mergeQueryableIndex(
|
||||
Arrays.asList(index1, index2, index3),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
tmpDirMerged,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
|
||||
Iterable<Rowboat> boats = adapter.getRows();
|
||||
List<Rowboat> boatList = new ArrayList<>();
|
||||
for (Rowboat boat : boats) {
|
||||
boatList.add(boat);
|
||||
}
|
||||
|
||||
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
|
||||
Assert.assertEquals(3, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisjointDimMerge() throws Exception
|
||||
{
|
||||
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
|
||||
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
|
||||
|
||||
final File tmpDirA = temporaryFolder.newFolder();
|
||||
final File tmpDirB = temporaryFolder.newFolder();
|
||||
final File tmpDirMerged = temporaryFolder.newFolder();
|
||||
|
||||
QueryableIndex indexA = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.persist(
|
||||
toPersistA,
|
||||
tmpDirA,
|
||||
null,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
QueryableIndex indexB = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.persist(
|
||||
toPersistB,
|
||||
tmpDirB,
|
||||
null,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
final QueryableIndex merged = closer.closeLater(
|
||||
INDEX_IO.loadIndex(
|
||||
INDEX_MERGER.mergeQueryableIndex(
|
||||
Arrays.asList(indexA, indexB),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
tmpDirMerged,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
|
||||
Iterable<Rowboat> boats = adapter.getRows();
|
||||
List<Rowboat> boatList = new ArrayList<>();
|
||||
for (Rowboat boat : boats) {
|
||||
boatList.add(boat);
|
||||
}
|
||||
|
||||
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames()));
|
||||
Assert.assertEquals(5, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(2).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(3).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(3).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics());
|
||||
}
|
||||
|
||||
private IncrementalIndex getIndexD3() throws Exception
|
||||
{
|
||||
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
1000
|
||||
);
|
||||
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList("d3", "d1", "d2"),
|
||||
ImmutableMap.<String, Object>of("d1", "100", "d2", "4000", "d3", "30000")
|
||||
)
|
||||
);
|
||||
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList("d3", "d1", "d2"),
|
||||
ImmutableMap.<String, Object>of("d1", "200", "d2", "3000", "d3", "50000")
|
||||
)
|
||||
);
|
||||
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList("d3", "d1", "d2"),
|
||||
ImmutableMap.<String, Object>of("d1", "300", "d2", "2000", "d3", "40000")
|
||||
)
|
||||
);
|
||||
|
||||
return toPersist1;
|
||||
}
|
||||
|
||||
private IncrementalIndex getSingleDimIndex(String dimName, List<String> values) throws Exception
|
||||
{
|
||||
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
1000
|
||||
);
|
||||
|
||||
for (String val : values) {
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList(dimName),
|
||||
ImmutableMap.<String, Object>of(dimName, val)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return toPersist1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue