From 1e44445f06ec799beed0b81c2cfe3994d9b4c5ee Mon Sep 17 00:00:00 2001 From: Michael Schiff Date: Tue, 12 Jan 2016 08:54:28 -0800 Subject: [PATCH] Adds support for empty merge metrics. fixes #2256 --- .../java/io/druid/segment/IndexMerger.java | 25 +++-- .../io/druid/segment/IndexMergerTest.java | 102 ++++++++++++++++++ 2 files changed, 121 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 593fa40c082..e72f100bbb3 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -350,7 +350,6 @@ public class IndexMerger } } ) - .concat(Arrays.>asList(new AggFactoryStringIndexed(metricAggs))) ) ), new Function() @@ -362,14 +361,28 @@ public class IndexMerger } } ); - if (mergedMetrics.size() != metricAggs.length) { - throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), metricAggs.length); - } final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()]; for (int i = 0; i < metricAggs.length; i++) { AggregatorFactory metricAgg = metricAggs[i]; - sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg; + int metricIndex = mergedMetrics.indexOf(metricAgg.getName()); + /* + If metricIndex is negative, one of the metricAggs was not present in the union of metrics from the indices + we are merging + */ + if (metricIndex > -1) { + sortedMetricAggs[metricIndex] = metricAgg; + } + } + + /* + If there is nothing at sortedMetricAggs[i], then we did not have a metricAgg whose name matched the name + of the ith element of mergedMetrics. I.e. There was a metric in the indices to merge that we did not ask for. + */ + for (int i = 0; i < sortedMetricAggs.length; i++) { + if (sortedMetricAggs[i] == null) { + throw new IAE("Indices to merge contained metric[%s], but requested metrics did not", mergedMetrics.get(i)); + } } for (int i = 0; i < mergedMetrics.size(); i++) { @@ -377,7 +390,7 @@ public class IndexMerger throw new IAE( "Metric mismatch, index[%d] [%s] != [%s]", i, - metricAggs[i].getName(), + sortedMetricAggs[i].getName(), mergedMetrics.get(i) ); } diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 716303a72ee..be404db8445 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.collections.bitmap.RoaringBitmapFactory; +import com.metamx.common.IAE; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.granularity.QueryGranularity; @@ -1466,7 +1467,108 @@ public class IndexMergerTest tmpDirMerged, indexSpec ); + } + @Test + public void testMismatchedMetrics() throws IOException + { + IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("A", "A") + }); + closer.closeLater(index1); + + IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("A", "A"), + new LongSumAggregatorFactory("C", "C") + }); + closer.closeLater(index2); + + IncrementalIndex index3 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("B", "B") + }); + closer.closeLater(index3); + + IncrementalIndex index4 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("C", "C"), + new LongSumAggregatorFactory("A", "A"), + new LongSumAggregatorFactory("B", "B") + }); + closer.closeLater(index4); + + IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("C", "C"), + new LongSumAggregatorFactory("B", "B") + }); + closer.closeLater(index5); + + + Interval interval = new Interval(0, new DateTime().getMillis()); + RoaringBitmapFactory factory = new RoaringBitmapFactory(); + ArrayList toMerge = Lists.newArrayList( + new IncrementalIndexAdapter(interval, index1, factory), + new IncrementalIndexAdapter(interval, index2, factory), + new IncrementalIndexAdapter(interval, index3, factory), + new IncrementalIndexAdapter(interval, index4, factory), + new IncrementalIndexAdapter(interval, index5, factory) + ); + + final File tmpDirMerged = temporaryFolder.newFolder(); + + File merged = INDEX_MERGER.merge( + toMerge, + new AggregatorFactory[]{ + new LongSumAggregatorFactory("A", "A"), + new LongSumAggregatorFactory("B", "B"), + new LongSumAggregatorFactory("C", "C"), + new LongSumAggregatorFactory("D", "D") + }, + tmpDirMerged, + indexSpec + ); + + // Since D was not present in any of the indices, it is not present in the output + final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged))); + Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics())); + + } + + @Test(expected = IAE.class) + public void testMismatchedMetricsVarying() throws IOException + { + + IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("A", "A"), + new LongSumAggregatorFactory("C", "C") + }); + closer.closeLater(index2); + + IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{ + new LongSumAggregatorFactory("C", "C"), + new LongSumAggregatorFactory("B", "B") + }); + closer.closeLater(index5); + + + Interval interval = new Interval(0, new DateTime().getMillis()); + RoaringBitmapFactory factory = new RoaringBitmapFactory(); + ArrayList toMerge = Lists.newArrayList( + new IncrementalIndexAdapter(interval, index2, factory) + ); + + final File tmpDirMerged = temporaryFolder.newFolder(); + + final File merged = INDEX_MERGER.merge( + toMerge, + new AggregatorFactory[] { + new LongSumAggregatorFactory("B", "B"), + new LongSumAggregatorFactory("A", "A"), + new LongSumAggregatorFactory("D", "D") + }, + tmpDirMerged, + indexSpec + ); + final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged))); + Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics())); } private IncrementalIndex getIndexD3() throws Exception