SegmentMetadataQuery: Fix merging of ColumnAnalysis errors.

Also add tests for:
- ColumnAnalysis folding
- Mixed mmap/incremental merging
This commit is contained in:
Gian Merlino 2016-01-26 15:58:47 -08:00
parent 3844658fb5
commit 795343f7ef
3 changed files with 191 additions and 36 deletions

View File

@ -98,6 +98,14 @@ public class ColumnAnalysis
return this; return this;
} }
if (isError() && rhs.isError()) {
return errorMessage.equals(rhs.getErrorMessage()) ? this : ColumnAnalysis.error("multiple_errors");
} else if (isError()) {
return this;
} else if (rhs.isError()) {
return rhs;
}
if (!type.equals(rhs.getType())) { if (!type.equals(rhs.getType())) {
return ColumnAnalysis.error("cannot_merge_diff_types"); return ColumnAnalysis.error("cannot_merge_diff_types");
} }

View File

@ -95,24 +95,34 @@ public class SegmentMetadataQueryTest
); );
} }
private final QueryRunner runner; private final QueryRunner runner1;
private final boolean usingMmappedSegment; private final QueryRunner runner2;
private final boolean mmap1;
private final boolean mmap2;
private final SegmentMetadataQuery testQuery; private final SegmentMetadataQuery testQuery;
private final SegmentAnalysis expectedSegmentAnalysis; private final SegmentAnalysis expectedSegmentAnalysis1;
private final SegmentAnalysis expectedSegmentAnalysis2;
@Parameterized.Parameters(name = "runner = {1}") @Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}")
public static Collection<Object[]> constructorFeeder() public static Collection<Object[]> constructorFeeder()
{ {
return ImmutableList.of( return ImmutableList.of(
new Object[]{makeMMappedQueryRunner(FACTORY), "mmap", true}, new Object[]{true, true},
new Object[]{makeIncrementalIndexQueryRunner(FACTORY), "incremental", false} new Object[]{true, false},
new Object[]{false, true},
new Object[]{false, false}
); );
} }
public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean usingMmappedSegment) public SegmentMetadataQueryTest(
boolean mmap1,
boolean mmap2
)
{ {
this.runner = runner; this.runner1 = mmap1 ? makeMMappedQueryRunner(FACTORY) : makeIncrementalIndexQueryRunner(FACTORY);
this.usingMmappedSegment = usingMmappedSegment; this.runner2 = mmap2 ? makeMMappedQueryRunner(FACTORY) : makeIncrementalIndexQueryRunner(FACTORY);
this.mmap1 = mmap1;
this.mmap2 = mmap2;
testQuery = Druids.newSegmentMetadataQueryBuilder() testQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing") .dataSource("testing")
.intervals("2013/2014") .intervals("2013/2014")
@ -121,7 +131,7 @@ public class SegmentMetadataQueryTest
.merge(true) .merge(true)
.build(); .build();
expectedSegmentAnalysis = new SegmentAnalysis( expectedSegmentAnalysis1 = new SegmentAnalysis(
"testSegment", "testSegment",
ImmutableList.of( ImmutableList.of(
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z") new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
@ -139,7 +149,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
false, false,
usingMmappedSegment ? 10881 : 0, mmap1 ? 10881 : 0,
1, 1,
null null
), ),
@ -151,7 +161,41 @@ public class SegmentMetadataQueryTest
null, null,
null null
) )
), usingMmappedSegment ? 71982 : 32643, ), mmap1 ? 71982 : 32643,
1209,
null
);
expectedSegmentAnalysis2 = new SegmentAnalysis(
"testSegment",
ImmutableList.of(
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
),
ImmutableMap.of(
"__time",
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
12090,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
mmap2 ? 10881 : 0,
1,
null
),
"index",
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
9672,
null,
null
)
), mmap2 ? 71982 : 32643,
1209, 1209,
null null
); );
@ -162,11 +206,11 @@ public class SegmentMetadataQueryTest
public void testSegmentMetadataQuery() public void testSegmentMetadataQuery()
{ {
List<SegmentAnalysis> results = Sequences.toList( List<SegmentAnalysis> results = Sequences.toList(
runner.run(testQuery, Maps.newHashMap()), runner1.run(testQuery, Maps.newHashMap()),
Lists.<SegmentAnalysis>newArrayList() Lists.<SegmentAnalysis>newArrayList()
); );
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis), results); Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis1), results);
} }
@Test @Test
@ -194,19 +238,21 @@ public class SegmentMetadataQueryTest
) )
), ),
0, 0,
expectedSegmentAnalysis.getNumRows() * 2, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null null
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
FACTORY.mergeRunners( FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
) )
), ),
toolChest toolChest
@ -254,19 +300,21 @@ public class SegmentMetadataQueryTest
) )
), ),
0, 0,
expectedSegmentAnalysis.getNumRows() * 2, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null null
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
FACTORY.mergeRunners( FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
) )
), ),
toolChest toolChest
@ -294,7 +342,7 @@ public class SegmentMetadataQueryTest
{ {
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged", "merged",
ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)), ImmutableList.of(expectedSegmentAnalysis1.getIntervals().get(0)),
ImmutableMap.of( ImmutableMap.of(
"__time", "__time",
new ColumnAnalysis( new ColumnAnalysis(
@ -308,7 +356,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
false, false,
usingMmappedSegment ? 21762 : 0, 10881 * ((mmap1 ? 1 : 0) + (mmap2 ? 1 : 0)),
1, 1,
null null
), ),
@ -321,20 +369,22 @@ public class SegmentMetadataQueryTest
null null
) )
), ),
expectedSegmentAnalysis.getSize() * 2, expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(),
expectedSegmentAnalysis.getNumRows() * 2, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null null
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
FACTORY.mergeRunners( FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
) )
), ),
toolChest toolChest
@ -368,19 +418,21 @@ public class SegmentMetadataQueryTest
) )
), ),
0, 0,
expectedSegmentAnalysis.getNumRows() * 2, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null null
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
FACTORY.mergeRunners( FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
) )
), ),
toolChest toolChest
@ -424,19 +476,21 @@ public class SegmentMetadataQueryTest
) )
), ),
0, 0,
expectedSegmentAnalysis.getNumRows() * 2, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
expectedAggregators expectedAggregators
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
FACTORY.mergeRunners( FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
) )
), ),
toolChest toolChest
@ -463,17 +517,17 @@ public class SegmentMetadataQueryTest
public void testBySegmentResults() public void testBySegmentResults()
{ {
Result<BySegmentResultValue> bySegmentResult = new Result<BySegmentResultValue>( Result<BySegmentResultValue> bySegmentResult = new Result<BySegmentResultValue>(
expectedSegmentAnalysis.getIntervals().get(0).getStart(), expectedSegmentAnalysis1.getIntervals().get(0).getStart(),
new BySegmentResultValueClass( new BySegmentResultValueClass(
Arrays.asList( Arrays.asList(
expectedSegmentAnalysis expectedSegmentAnalysis1
), expectedSegmentAnalysis.getId(), testQuery.getIntervals().get(0) ), expectedSegmentAnalysis1.getId(), testQuery.getIntervals().get(0)
) )
); );
QueryToolChest toolChest = FACTORY.getToolchest(); QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner); QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner1);
ExecutorService exec = Executors.newCachedThreadPool(); ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.metadata.metadata;
import org.junit.Assert;
import org.junit.Test;
public class ColumnAnalysisTest
{
@Test
public void testFoldStringColumns()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", true, 3L, 4, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", true, 4L, 4, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
@Test
public void testFoldWithNull()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
Assert.assertEquals(analysis1, analysis1.fold(null));
}
@Test
public void testFoldComplexColumns()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis expected = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
@Test
public void testFoldDifferentTypes()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 1L, 1, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("COMPLEX", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:cannot_merge_diff_types");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
@Test
public void testFoldSameErrors()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("foo");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
@Test
public void testFoldErrorAndNoError()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
@Test
public void testFoldDifferentErrors()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("bar");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:multiple_errors");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
}