Fix the task id creation in CompactionTask (#10445)

* Fix the task id creation in CompactionTask

* review comments

* Ignore test for range partitioning and segment lock
This commit is contained in:
Abhishek Agarwal 2020-10-02 03:32:04 +05:30 committed by GitHub
parent d057c5149f
commit e282ab5695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 22 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
@ -361,10 +362,14 @@ public class CompactionTask extends AbstractBatchIndexTask
// a new Appenderator on its own instead. As a result, they should use different sequence names to allocate // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
// new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details. // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
// In this case, we use different fake IDs for each created index task. // In this case, we use different fake IDs for each created index task.
final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1 ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
? createIndexTaskSpecId(i) InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource(
: getId(); ingestionSpec.getDataSchema().getParser()
return newTask(subtaskId, ingestionSpecs.get(i)); );
final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)
? getId()
: createIndexTaskSpecId(i);
return newTask(subtaskId, ingestionSpec);
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -466,18 +466,25 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
registerResourceCloserOnAbnormalExit(currentSubTaskHolder); registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
} }
private boolean isParallelMode() public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{ {
if (null == tuningConfig) {
return false;
}
boolean useRangePartitions = useRangePartitions(tuningConfig);
// Range partitioning is not implemented for runSequential() (but hash partitioning is) // Range partitioning is not implemented for runSequential() (but hash partitioning is)
int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2; int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
return baseInputSource.isSplittable()
&& ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
} }
private boolean useRangePartitions() private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
{ {
return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec; return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
}
private boolean isParallelMode()
{
return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
} }
/** /**
@ -512,7 +519,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
*/ */
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
{ {
return useRangePartitions() return useRangePartitions(ingestionSchema.getTuningConfig())
? runRangePartitionMultiPhaseParallel(toolbox) ? runRangePartitionMultiPhaseParallel(toolbox)
: runHashPartitionMultiPhaseParallel(toolbox); : runHashPartitionMultiPhaseParallel(toolbox);
} }

View File

@ -63,6 +63,7 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -152,7 +153,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass() segment.getShardSpec().getClass()
); );
// Expecte compaction state to exist as store compaction state by default // Expect compaction state to exist as store compaction state by default
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
} }
@ -161,9 +162,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
public void testRunParallelWithHashPartitioningMatchCompactionState() public void testRunParallelWithHashPartitioningMatchCompactionState()
{ {
// Hash partitioning is not supported with segment lock yet // Hash partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) { Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
return;
}
runIndexTask(null, true); runIndexTask(null, true);
final Builder builder = new Builder( final Builder builder = new Builder(
@ -182,7 +181,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
); );
for (DataSegment segment : compactedSegments) { for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default // Expect compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
@ -192,9 +191,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
public void testRunParallelWithRangePartitioning() public void testRunParallelWithRangePartitioning()
{ {
// Range partitioning is not supported with segment lock yet // Range partitioning is not supported with segment lock yet
if (lockGranularity == LockGranularity.SEGMENT) { Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
return;
}
runIndexTask(null, true); runIndexTask(null, true);
final Builder builder = new Builder( final Builder builder = new Builder(
@ -213,7 +210,36 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
); );
for (DataSegment segment : compactedSegments) { for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default // Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
public void testRunParallelWithRangePartitioningWithSingleTask()
{
// Range partitioning is not supported with segment lock yet
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
final CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
);
for (DataSegment segment : compactedSegments) {
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
@ -242,7 +268,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass() segment.getShardSpec().getClass()
); );
// Expecte compaction state to exist as store compaction state by default // Expect compaction state to exist as store compaction state by default
Assert.assertEquals(null, segment.getLastCompactionState()); Assert.assertEquals(null, segment.getLastCompactionState());
} }
} }

View File

@ -21,11 +21,14 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
@ -36,6 +39,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.easymock.EasyMock;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -55,6 +59,9 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
@RunWith(Enclosed.class) @RunWith(Enclosed.class)
public class ParallelIndexSupervisorTaskTest public class ParallelIndexSupervisorTaskTest
{ {
@ -241,4 +248,64 @@ public class ParallelIndexSupervisorTaskTest
); );
} }
} }
public static class staticUtilsTest
{
@Test
public void testIsParallelModeFalse_nullTuningConfig()
{
InputSource inputSource = mock(InputSource.class);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
}
@Test
public void testIsParallelModeFalse_rangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
@Test
public void testIsParallelModeFalse_notRangePartition()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(true).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
@Test
public void testIsParallelModeFalse_inputSourceNotSplittable()
{
InputSource inputSource = mock(InputSource.class);
expect(inputSource.isSplittable()).andReturn(false).anyTimes();
ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
.anyTimes();
expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
EasyMock.replay(inputSource, tuningConfig);
Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
}
}
} }