More tests for range partition parallel indexing (#9232)

Add more unit tests for range partition native batch parallel indexing.

Also, fix a bug where ParallelIndexPhaseRunner incorrectly thinks that
identical collected DimensionDistributionReports are not equal due to
not overriding equals() in DimensionDistributionReport.
This commit is contained in:
Chi Cao Minh 2020-01-21 12:59:43 -08:00 committed by Clint Wylie
parent a2939bbd1a
commit 0b0056b77f
9 changed files with 106 additions and 13 deletions

View File

@ -272,6 +272,11 @@
<artifactId>assertj-core</artifactId> <artifactId>assertj-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringD
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class DimensionDistributionReport implements SubTaskReport public class DimensionDistributionReport implements SubTaskReport
{ {
@ -65,4 +66,24 @@ public class DimensionDistributionReport implements SubTaskReport
", intervalToDistribution=" + intervalToDistribution + ", intervalToDistribution=" + intervalToDistribution +
'}'; '}';
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionDistributionReport that = (DimensionDistributionReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(intervalToDistribution, that.intervalToDistribution);
}
@Override
public int hashCode()
{
return Objects.hash(taskId, intervalToDistribution);
}
} }

View File

@ -37,6 +37,7 @@ import org.apache.datasketches.quantiles.ItemsSketch;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.Objects;
/** /**
* Counts approximate frequencies of strings. * Counts approximate frequencies of strings.
@ -137,6 +138,40 @@ public class StringSketch implements StringDistribution
'}'; '}';
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringSketch that = (StringSketch) o;
// ParallelIndexPhaseRunner.collectReport() uses equals() to check subtasks send identical reports if they retry.
// However, ItemsSketch does not override equals(): https://github.com/apache/incubator-datasketches-java/issues/140
//
// Since ItemsSketch has built-in non-determinism, only rely on ItemsSketch properties that are deterministic. This
// check is best-effort as it is possible for it to return true for sketches that contain different values.
return delegate.getK() == that.delegate.getK() &&
delegate.getN() == that.delegate.getN() &&
Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) &&
Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
}
@Override
public int hashCode()
{
// See comment in equals() regarding ItemsSketch.
return Objects.hash(
delegate.getK(),
delegate.getN(),
delegate.getMaxValue(),
delegate.getMinValue()
);
}
ItemsSketch<String> getDelegate() ItemsSketch<String> getDelegate()
{ {
return delegate; return delegate;

View File

@ -107,7 +107,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
Interval interval, Interval interval,
File inputDir, File inputDir,
String filter, String filter,
DimensionBasedPartitionsSpec partitionsSpec DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
) throws Exception ) throws Exception
{ {
final ParallelIndexSupervisorTask task = newTask( final ParallelIndexSupervisorTask task = newTask(
@ -115,7 +116,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
interval, interval,
inputDir, inputDir,
filter, filter,
partitionsSpec partitionsSpec,
maxNumConcurrentSubTasks
); );
actionClient = createActionClient(task); actionClient = createActionClient(task);
@ -137,7 +139,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
Interval interval, Interval interval,
File inputDir, File inputDir,
String filter, String filter,
DimensionBasedPartitionsSpec partitionsSpec DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
) )
{ {
GranularitySpec granularitySpec = new UniformGranularitySpec( GranularitySpec granularitySpec = new UniformGranularitySpec(
@ -163,7 +166,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
null, null,
null, null,
null, null,
2, maxNumConcurrentSubTasks,
null, null,
null, null,
null, null,

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
@ -52,4 +53,12 @@ public class DimensionDistributionReportTest
{ {
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
} }
@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(DimensionDistributionReport.class)
.usingGetClass()
.verify();
}
} }

View File

@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
false, false,
0 0
); );
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
@ -129,7 +130,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
Intervals.of("2017/2018"), Intervals.of("2017/2018"),
inputDir, inputDir,
"test_*", "test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")) new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS
); );
assertHashedPartition(publishedSegments); assertHashedPartition(publishedSegments);
} }

View File

@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
0 0
); );
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
{ {
return ImmutableList.of( return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false}, new Object[]{LockGranularity.TIME_CHUNK, false, 2},
new Object[]{LockGranularity.TIME_CHUNK, true}, new Object[]{LockGranularity.TIME_CHUNK, true, 2},
new Object[]{LockGranularity.SEGMENT, true} new Object[]{LockGranularity.SEGMENT, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor
); );
} }
private File inputDir; private File inputDir;
private SetMultimap<Interval, String> intervalToDim1; private SetMultimap<Interval, String> intervalToDim1;
public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi) private final int maxNumConcurrentSubTasks;
public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks
)
{ {
super(lockGranularity, useInputFormatApi); super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
} }
@Override @Override
@ -169,7 +177,8 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
null, null,
DIM1, DIM1,
false false
) ),
maxNumConcurrentSubTasks
); );
assertRangePartitions(publishedSegments); assertRangePartitions(publishedSegments);
} }
@ -362,7 +371,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
} }
} }
private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
extends PartialGenericSegmentMergeParallelIndexTaskRunner extends PartialGenericSegmentMergeParallelIndexTaskRunner
{ {

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution; package org.apache.druid.indexing.common.task.batch.parallel.distribution;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.quantiles.ItemsSketch; import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.jackson.JacksonModule; import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -69,6 +70,15 @@ public class StringSketchTest
target.put(MAX_STRING); target.put(MAX_STRING);
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
} }
@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(StringSketch.class)
.usingGetClass()
.withNonnullFields("delegate")
.verify();
}
} }
public static class PutTest public static class PutTest

View File

@ -1201,7 +1201,7 @@
<dependency> <dependency>
<groupId>nl.jqno.equalsverifier</groupId> <groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId> <artifactId>equalsverifier</artifactId>
<version>3.1.10</version> <version>3.1.11</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>