Merge pull request #2257 from tubemogul/index-merge-bug

Adds support for empty merge metrics. fixes #2256
This commit is contained in:
Gian Merlino 2016-01-21 16:38:00 -08:00
commit e5913be90e
2 changed files with 121 additions and 6 deletions

View File

@ -350,7 +350,6 @@ public class IndexMerger
}
}
)
.concat(Arrays.<Iterable<String>>asList(new AggFactoryStringIndexed(metricAggs)))
)
),
new Function<String, String>()
@ -362,14 +361,28 @@ public class IndexMerger
}
}
);
if (mergedMetrics.size() != metricAggs.length) {
throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), metricAggs.length);
}
final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()];
for (int i = 0; i < metricAggs.length; i++) {
AggregatorFactory metricAgg = metricAggs[i];
sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg;
int metricIndex = mergedMetrics.indexOf(metricAgg.getName());
/*
If metricIndex is negative, one of the metricAggs was not present in the union of metrics from the indices
we are merging
*/
if (metricIndex > -1) {
sortedMetricAggs[metricIndex] = metricAgg;
}
}
/*
If there is nothing at sortedMetricAggs[i], then we did not have a metricAgg whose name matched the name
of the ith element of mergedMetrics. I.e. There was a metric in the indices to merge that we did not ask for.
*/
for (int i = 0; i < sortedMetricAggs.length; i++) {
if (sortedMetricAggs[i] == null) {
throw new IAE("Indices to merge contained metric[%s], but requested metrics did not", mergedMetrics.get(i));
}
}
for (int i = 0; i < mergedMetrics.size(); i++) {
@ -377,7 +390,7 @@ public class IndexMerger
throw new IAE(
"Metric mismatch, index[%d] [%s] != [%s]",
i,
metricAggs[i].getName(),
sortedMetricAggs[i].getName(),
mergedMetrics.get(i)
);
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.common.IAE;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity;
@ -1466,7 +1467,108 @@ public class IndexMergerTest
tmpDirMerged,
indexSpec
);
}
@Test
public void testMismatchedMetrics() throws IOException
{
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
closer.closeLater(index1);
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
closer.closeLater(index2);
IncrementalIndex index3 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index3);
IncrementalIndex index4 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index4);
IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index5);
Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index1, factory),
new IncrementalIndexAdapter(interval, index2, factory),
new IncrementalIndexAdapter(interval, index3, factory),
new IncrementalIndexAdapter(interval, index4, factory),
new IncrementalIndexAdapter(interval, index5, factory)
);
final File tmpDirMerged = temporaryFolder.newFolder();
File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("D", "D")
},
tmpDirMerged,
indexSpec
);
// Since D was not present in any of the indices, it is not present in the output
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
@Test(expected = IAE.class)
public void testMismatchedMetricsVarying() throws IOException
{
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
closer.closeLater(index2);
IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index5);
Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index2, factory)
);
final File tmpDirMerged = temporaryFolder.newFolder();
final File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[] {
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("D", "D")
},
tmpDirMerged,
indexSpec
);
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
private IncrementalIndex getIndexD3() throws Exception