From 0b0056b77f4c071bc1005844e09fe6a559a78423 Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Tue, 21 Jan 2020 12:59:43 -0800 Subject: [PATCH] More tests for range partition parallel indexing (#9232) Add more unit tests for range partition native batch parallel indexing. Also, fix a bug where ParallelIndexPhaseRunner incorrectly thinks that identical collected DimensionDistributionReports are not equal due to not overriding equals() in DimensionDistributionReport. --- indexing-service/pom.xml | 5 +++ .../parallel/DimensionDistributionReport.java | 21 +++++++++++ .../parallel/distribution/StringSketch.java | 35 +++++++++++++++++++ ...bstractMultiPhaseParallelIndexingTest.java | 11 +++--- .../DimensionDistributionReportTest.java | 9 +++++ ...rtitionMultiPhaseParallelIndexingTest.java | 4 ++- ...rtitionMultiPhaseParallelIndexingTest.java | 22 ++++++++---- .../distribution/StringSketchTest.java | 10 ++++++ pom.xml | 2 +- 9 files changed, 106 insertions(+), 13 deletions(-) diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 87148665716..19397787a85 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -272,6 +272,11 @@ assertj-core test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java index a2e6dd0c476..a7aea29a284 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringD import org.joda.time.Interval; import java.util.Map; +import java.util.Objects; public class DimensionDistributionReport implements SubTaskReport { @@ -65,4 +66,24 @@ public class DimensionDistributionReport implements SubTaskReport ", intervalToDistribution=" + intervalToDistribution + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DimensionDistributionReport that = (DimensionDistributionReport) o; + return Objects.equals(taskId, that.taskId) && + Objects.equals(intervalToDistribution, that.intervalToDistribution); + } + + @Override + public int hashCode() + { + return Objects.hash(taskId, intervalToDistribution); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java index bba16cc4662..dc4dbe1e06a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java @@ -37,6 +37,7 @@ import org.apache.datasketches.quantiles.ItemsSketch; import java.io.IOException; import java.util.Comparator; +import java.util.Objects; /** * Counts approximate frequencies of strings. @@ -137,6 +138,40 @@ public class StringSketch implements StringDistribution '}'; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StringSketch that = (StringSketch) o; + + // ParallelIndexPhaseRunner.collectReport() uses equals() to check subtasks send identical reports if they retry. + // However, ItemsSketch does not override equals(): https://github.com/apache/incubator-datasketches-java/issues/140 + // + // Since ItemsSketch has built-in non-determinism, only rely on ItemsSketch properties that are deterministic. This + // check is best-effort as it is possible for it to return true for sketches that contain different values. + return delegate.getK() == that.delegate.getK() && + delegate.getN() == that.delegate.getN() && + Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) && + Objects.equals(delegate.getMinValue(), that.delegate.getMinValue()); + } + + @Override + public int hashCode() + { + // See comment in equals() regarding ItemsSketch. + return Objects.hash( + delegate.getK(), + delegate.getN(), + delegate.getMaxValue(), + delegate.getMinValue() + ); + } + ItemsSketch getDelegate() { return delegate; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index 880d8065e17..a7c4396b780 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -107,7 +107,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn Interval interval, File inputDir, String filter, - DimensionBasedPartitionsSpec partitionsSpec + DimensionBasedPartitionsSpec partitionsSpec, + int maxNumConcurrentSubTasks ) throws Exception { final ParallelIndexSupervisorTask task = newTask( @@ -115,7 +116,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn interval, inputDir, filter, - partitionsSpec + partitionsSpec, + maxNumConcurrentSubTasks ); actionClient = createActionClient(task); @@ -137,7 +139,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn Interval interval, File inputDir, String filter, - DimensionBasedPartitionsSpec partitionsSpec + DimensionBasedPartitionsSpec partitionsSpec, + int maxNumConcurrentSubTasks ) { GranularitySpec granularitySpec = new UniformGranularitySpec( @@ -163,7 +166,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn null, null, null, - 2, + maxNumConcurrentSubTasks, null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java index c23362f3e9c..e82041be814 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch; import org.apache.druid.java.util.common.Intervals; @@ -52,4 +53,12 @@ public class DimensionDistributionReportTest { TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); } + + @Test + public void abidesEqualsContract() + { + EqualsVerifier.forClass(DimensionDistributionReport.class) + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index dffd9d52e8b..7219f16ef88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh false, 0 ); + private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2; @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") public static Iterable constructorFeeder() @@ -129,7 +130,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh Intervals.of("2017/2018"), inputDir, "test_*", - new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")) + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), + MAX_NUM_CONCURRENT_SUB_TASKS ); assertHashedPartition(publishedSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index 94ccf5cb03a..53cb7593def 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP 0 ); - @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.TIME_CHUNK, false}, - new Object[]{LockGranularity.TIME_CHUNK, true}, - new Object[]{LockGranularity.SEGMENT, true} + new Object[]{LockGranularity.TIME_CHUNK, false, 2}, + new Object[]{LockGranularity.TIME_CHUNK, true, 2}, + new Object[]{LockGranularity.SEGMENT, true, 2}, + new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor ); } private File inputDir; private SetMultimap intervalToDim1; - public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi) + private final int maxNumConcurrentSubTasks; + + public RangePartitionMultiPhaseParallelIndexingTest( + LockGranularity lockGranularity, + boolean useInputFormatApi, + int maxNumConcurrentSubTasks + ) { super(lockGranularity, useInputFormatApi); + this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; } @Override @@ -169,7 +177,8 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP null, DIM1, false - ) + ), + maxNumConcurrentSubTasks ); assertRangePartitions(publishedSegments); } @@ -362,7 +371,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP } } - private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner extends PartialGenericSegmentMergeParallelIndexTaskRunner { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java index b09634df3f8..d82e21253c7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.datasketches.quantiles.ItemsSketch; import org.apache.druid.jackson.JacksonModule; import org.apache.druid.java.util.common.StringUtils; @@ -69,6 +70,15 @@ public class StringSketchTest target.put(MAX_STRING); TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target); } + + @Test + public void abidesEqualsContract() + { + EqualsVerifier.forClass(StringSketch.class) + .usingGetClass() + .withNonnullFields("delegate") + .verify(); + } } public static class PutTest diff --git a/pom.xml b/pom.xml index d105aff2d0f..bfc211ecac7 100644 --- a/pom.xml +++ b/pom.xml @@ -1201,7 +1201,7 @@ nl.jqno.equalsverifier equalsverifier - 3.1.10 + 3.1.11 test