CompactionTask throws exception on conflicting segmentGranularity (#10996)

* CompactionTask throws exception on conflicting segmentGranularity

* add comment
This commit is contained in:
Suneet Saldanha 2021-03-16 12:51:50 -07:00 committed by GitHub
parent f37713dc6d
commit 6b0c2e8996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 7 deletions

View File

@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
@ -207,6 +208,20 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
// Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
// by picking one or another.
if (granularitySpec != null
&& segmentGranularity != null
&& !segmentGranularity.equals(granularitySpec.getSegmentGranularity())) {
throw new IAE(StringUtils.format(
"Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n"
+ "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity",
segmentGranularity,
granularitySpec.getSegmentGranularity()
));
}
if (granularitySpec == null && segmentGranularity != null) {
this.granularitySpec = new ClientCompactionTaskGranularitySpec(segmentGranularity, null);
} else {
@ -544,7 +559,9 @@ public class CompactionTask extends AbstractBatchIndexTask
segmentsToCompact,
dimensionsSpec,
metricsSpec,
granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null) : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
granularitySpec == null
? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
specs.add(
@ -658,7 +675,10 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info("Generate compaction task spec with new query granularity overrided from input [%s]", queryGranularityToUse);
log.info(
"Generate compaction task spec with new query granularity overrided from input [%s]",
queryGranularityToUse
);
}
final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(

View File

@ -73,8 +73,10 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@ -176,6 +178,9 @@ public class CompactionTaskTest
private static final IndexingServiceClient INDEXING_SERVICE_CLIENT = new NoopIndexingServiceClient();
private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT =
"Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n"
+ "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity";
private static Map<String, DimensionSchema> DIMENSIONS;
private static List<AggregatorFactory> AGGREGATORS;
@ -379,11 +384,14 @@ public class CompactionTaskTest
builder2.tuningConfig(createTuningConfig());
builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(), taskCreatedWithSegmentGranularity.getSegmentGranularity());
Assert.assertEquals(
taskCreatedWithGranularitySpec.getSegmentGranularity(),
taskCreatedWithSegmentGranularity.getSegmentGranularity()
);
}
@Test
public void testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity()
@Test(expected = IAE.class)
public void testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
{
final Builder builder = new Builder(
DATA_SOURCE,
@ -394,8 +402,66 @@ public class CompactionTaskTest
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY));
try {
builder.build();
}
catch (IAE iae) {
Assert.assertEquals(
StringUtils.format(
CONFLICTING_SEGMENT_GRANULARITY_FORMAT,
Granularities.HOUR,
Granularities.MINUTE
),
iae.getMessage()
);
throw iae;
}
Assert.fail("Should not have reached here!");
}
@Test(expected = IAE.class)
public void testCreateCompactionTaskWithNullSegmentGranularityInGranularitySpecAndSegmentGranularityShouldSucceed()
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY));
try {
builder.build();
}
catch (IAE iae) {
Assert.assertEquals(
StringUtils.format(
CONFLICTING_SEGMENT_GRANULARITY_FORMAT,
Granularities.HOUR,
null
),
iae.getMessage()
);
throw iae;
}
Assert.fail("Should not have reached here!");
}
@Test
public void testCreateCompactionTaskWithSameGranularitySpecAndSegmentGranularityShouldSucceed()
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
Assert.assertEquals(Granularities.MINUTE, taskCreatedWithSegmentGranularity.getSegmentGranularity());
Assert.assertEquals(Granularities.HOUR, taskCreatedWithSegmentGranularity.getSegmentGranularity());
}
@Test
@ -1201,7 +1267,8 @@ public class CompactionTaskTest
}
@Test
public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity() throws IOException, SegmentLoadingException
public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity()
throws IOException, SegmentLoadingException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,