diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 04c622d3e7c..bdf250aa9c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -207,6 +208,20 @@ public class CompactionTask extends AbstractBatchIndexTask this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; this.segmentGranularity = segmentGranularity; + // Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity` + // Now users should prefer to use `granularitySpec` + // In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding + // by picking one or another. + if (granularitySpec != null + && segmentGranularity != null + && !segmentGranularity.equals(granularitySpec.getSegmentGranularity())) { + throw new IAE(StringUtils.format( + "Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n" + + "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity", + segmentGranularity, + granularitySpec.getSegmentGranularity() + )); + } if (granularitySpec == null && segmentGranularity != null) { this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null); } else { @@ -544,7 +559,9 @@ public class CompactionTask extends AbstractBatchIndexTask segmentsToCompact, dimensionsSpec, metricsSpec, - granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) : granularitySpec.withSegmentGranularity(segmentGranularityToUse) + granularitySpec == null + ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) + : granularitySpec.withSegmentGranularity(segmentGranularityToUse) ); specs.add( @@ -658,7 +675,10 @@ public class CompactionTask extends AbstractBatchIndexTask log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse); } else { queryGranularityToUse = granularitySpec.getQueryGranularity(); - log.info("Generate compaction task spec with new query granularity overrided from input [%s]", queryGranularityToUse); + log.info( + "Generate compaction task spec with new query granularity overrided from input [%s]", + queryGranularityToUse + ); } final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index b0deee54655..35b242a0c4c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -73,8 +73,10 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; @@ -176,6 +178,9 @@ public class CompactionTaskTest private static final IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient(); private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); + private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT = + "Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n" + + "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity"; private static Map DIMENSIONS; private static List AGGREGATORS; @@ -379,11 +384,14 @@ public class CompactionTaskTest builder2.tuningConfig(createTuningConfig()); builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY)); final CompactionTask taskCreatedWithGranularitySpec = builder2.build(); - Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity()); + Assert.assertEquals( + taskCreatedWithGranularitySpec.getSegmentGranularity(), + taskCreatedWithSegmentGranularity.getSegmentGranularity() + ); } - @Test - public void testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity() + @Test(expected = IAE.class) + public void testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE() { final Builder builder = new Builder( DATA_SOURCE, @@ -394,8 +402,66 @@ public class CompactionTaskTest builder.tuningConfig(createTuningConfig()); builder.segmentGranularity(Granularities.HOUR); builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY)); + try { + builder.build(); + } + catch (IAE iae) { + Assert.assertEquals( + StringUtils.format( + CONFLICTING_SEGMENT_GRANULARITY_FORMAT, + Granularities.HOUR, + Granularities.MINUTE + ), + iae.getMessage() + ); + throw iae; + } + Assert.fail("Should not have reached here!"); + } + + @Test(expected = IAE.class) + public void testCreateCompactionTaskWithNullSegmentGranularityInGranularitySpecAndSegmentGranularityShouldSucceed() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); + builder.tuningConfig(createTuningConfig()); + builder.segmentGranularity(Granularities.HOUR); + builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY)); + try { + builder.build(); + } + catch (IAE iae) { + Assert.assertEquals( + StringUtils.format( + CONFLICTING_SEGMENT_GRANULARITY_FORMAT, + Granularities.HOUR, + null + ), + iae.getMessage() + ); + throw iae; + } + Assert.fail("Should not have reached here!"); + } + + @Test + public void testCreateCompactionTaskWithSameGranularitySpecAndSegmentGranularityShouldSucceed() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); + builder.tuningConfig(createTuningConfig()); + builder.segmentGranularity(Granularities.HOUR); + builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY)); final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); - Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity()); + Assert.assertEquals(Granularities.HOUR, taskCreatedWithSegmentGranularity.getSegmentGranularity()); } @Test @@ -1201,7 +1267,8 @@ public class CompactionTaskTest } @Test - public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity() throws IOException, SegmentLoadingException + public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity() + throws IOException, SegmentLoadingException { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox,