diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index bf7b143ab92..d341663fc91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -356,13 +356,13 @@ public class TaskQueue // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock()); + defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to // using the legacy protocol. task.addToContextIfAbsent( SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION ); - defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); giant.lock(); @@ -720,4 +720,10 @@ public class TaskQueue return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); } + + @VisibleForTesting + List getTasks() + { + return tasks; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 94c8dd28444..bdeef0511ed 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.TaskStatus; @@ -30,6 +31,8 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.IngestionTestBase; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; @@ -38,6 +41,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -49,6 +53,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; public class TaskQueueTest extends IngestionTestBase @@ -109,6 +114,161 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertTrue(task2.isDone()); } + @Test + public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + final List tasks = taskQueue.getTasks(); + Assert.assertEquals(1, tasks.size()); + final Task queuedTask = tasks.get(0); + Assert.assertTrue( + queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY) + ); + } + + @Test + public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig() + { + @Override + public Map getContext() + { + return ImmutableMap.of( + SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, + false + ); + } + }, + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + final List tasks = taskQueue.getTasks(); + Assert.assertEquals(1, tasks.size()); + final Task queuedTask = tasks.get(0); + Assert.assertFalse( + queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY) + ); + } + + @Test + public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask( + "t1", + Intervals.of("2021-01-01/P1D"), + ImmutableMap.of( + SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, + false + ) + ); + taskQueue.add(task); + final List tasks = taskQueue.getTasks(); + Assert.assertEquals(1, tasks.size()); + final Task queuedTask = tasks.get(0); + Assert.assertFalse( + queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY) + ); + } + + @Test + public void testLockConfigTakePrecedenceThanDefaultTaskContext() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig() + { + @Override + public Map getContext() + { + return ImmutableMap.of( + Tasks.FORCE_TIME_CHUNK_LOCK_KEY, + false + ); + } + }, + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + final List tasks = taskQueue.getTasks(); + Assert.assertEquals(1, tasks.size()); + final Task queuedTask = tasks.get(0); + Assert.assertTrue(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY)); + } + + @Test + public void testUserProvidedContextOverrideLockConfig() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask( + "t1", + Intervals.of("2021-01-01/P1D"), + ImmutableMap.of( + Tasks.FORCE_TIME_CHUNK_LOCK_KEY, + false + ) + ); + taskQueue.add(task); + final List tasks = taskQueue.getTasks(); + Assert.assertEquals(1, tasks.size()); + final Task queuedTask = tasks.get(0); + Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY)); + } + private static class TestTask extends AbstractBatchIndexTask { private final Interval interval; @@ -116,7 +276,12 @@ public class TaskQueueTest extends IngestionTestBase private TestTask(String id, Interval interval) { - super(id, "datasource", null); + this(id, interval, null); + } + + private TestTask(String id, Interval interval, Map context) + { + super(id, "datasource", context); this.interval = interval; }