fix issue #2744. Check for null before combining metrics (#2774)

This commit is contained in:
michaelschiff 2016-04-12 14:46:31 -07:00 committed by Fangjin Yang
parent ed5377465a
commit db35dd7508
2 changed files with 156 additions and 8 deletions

View File

@ -1474,7 +1474,15 @@ public class IndexMerger
Object[] rhsMetrics = rhs.getMetrics();
for (int i = 0; i < metrics.length; ++i) {
metrics[i] = metricAggs[i].combine(lhsMetrics[i], rhsMetrics[i]);
Object lhsMetric = lhsMetrics[i];
Object rhsMetric = rhsMetrics[i];
if (lhsMetric == null) {
metrics[i] = rhsMetric;
} else if (rhsMetric == null) {
metrics[i] = lhsMetric;
} else {
metrics[i] = metricAggs[i].combine(lhsMetric, rhsMetric);
}
}
final Rowboat retVal = new Rowboat(

View File

@ -875,7 +875,11 @@ public class IndexMergerTest
public void testMergeWithDimensionsList() throws Exception
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dimA", "dimB", "dimC")), null, null))
.withDimensionsSpec(new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dimA", "dimB", "dimC")),
null,
null
))
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
@ -1418,19 +1422,28 @@ public class IndexMergerTest
}
@Test
public void testMismatchedDimensions() throws IOException, IndexSizeExceededException
{
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
index1.add(new MapBasedInputRow(1L, Lists.newArrayList("d1", "d2"), ImmutableMap.<String, Object>of("d1", "a", "d2", "z", "A", 1)));
index1.add(new MapBasedInputRow(
1L,
Lists.newArrayList("d1", "d2"),
ImmutableMap.<String, Object>of("d1", "a", "d2", "z", "A", 1)
));
closer.closeLater(index1);
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
index2.add(new MapBasedInputRow(1L, Lists.newArrayList("d2"), ImmutableMap.<String, Object>of("d2", "z", "A", 2, "C", 100)));
index2.add(new MapBasedInputRow(
1L,
Lists.newArrayList("d1", "d2"),
ImmutableMap.<String, Object>of("d1", "a", "d2", "z", "A", 2, "C", 100)
));
closer.closeLater(index2);
Interval interval = new Interval(0, new DateTime().getMillis());
@ -1444,13 +1457,138 @@ public class IndexMergerTest
INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[] {
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C"),
},
tmpDirMerged,
indexSpec
);
}
@Test
public void testAddMetrics() throws IOException
{
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
closer.closeLater(index1);
long timestamp = System.currentTimeMillis();
index1.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "A", 5)
)
);
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
index2.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "A", 5, "C", 6)
)
);
closer.closeLater(index2);
Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index1, factory),
new IncrementalIndexAdapter(interval, index2, factory)
);
final File tmpDirMerged = temporaryFolder.newFolder();
File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
},
tmpDirMerged,
indexSpec
);
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(
merged)));
Assert.assertEquals(ImmutableSet.of("A", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
@Test
public void testAddMetricsBothSidesNull() throws IOException
{
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
closer.closeLater(index1);
long timestamp = System.currentTimeMillis();
index1.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "A", 5)
)
);
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
index2.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "A", 5, "C", 6)
)
);
closer.closeLater(index2);
IncrementalIndex index3 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
index3.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "A", 5)
)
);
Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index1, factory),
new IncrementalIndexAdapter(interval, index2, factory),
new IncrementalIndexAdapter(interval, index3, factory)
);
final File tmpDirMerged = temporaryFolder.newFolder();
File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
},
tmpDirMerged,
indexSpec
);
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(
merged)));
Assert.assertEquals(ImmutableSet.of("A", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
@Test
@ -1511,7 +1649,8 @@ public class IndexMergerTest
);
// Since D was not present in any of the indices, it is not present in the output
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(
merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
@ -1543,7 +1682,7 @@ public class IndexMergerTest
final File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[] {
new AggregatorFactory[]{
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("D", "D")
@ -1551,7 +1690,8 @@ public class IndexMergerTest
tmpDirMerged,
indexSpec
);
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(
merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}