Fix taskQueue to honor (#11243)

useLineageBasedSegmentAllocation in default taskContext
This commit is contained in:
Jihoon Son 2021-05-13 18:30:50 -07:00 committed by GitHub
parent 4a3c834ecf
commit 4100c5edc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 173 additions and 2 deletions

View File

@ -356,13 +356,13 @@ public class TaskQueue
// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
// Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to
// using the legacy protocol.
task.addToContextIfAbsent(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
giant.lock();
@ -720,4 +720,10 @@ public class TaskQueue
return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
}
@VisibleForTesting
List<Task> getTasks()
{
return tasks;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskStatus;
@ -30,6 +31,8 @@ import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
@ -38,6 +41,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -49,6 +53,7 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
public class TaskQueueTest extends IngestionTestBase
@ -109,6 +114,161 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertTrue(task2.isDone());
}
@Test
public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0);
Assert.assertTrue(
queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY)
);
}
@Test
public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return ImmutableMap.of(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
false
);
}
},
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0);
Assert.assertFalse(
queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY)
);
}
@Test
public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask(
"t1",
Intervals.of("2021-01-01/P1D"),
ImmutableMap.of(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
false
)
);
taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0);
Assert.assertFalse(
queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY)
);
}
@Test
public void testLockConfigTakePrecedenceThanDefaultTaskContext() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return ImmutableMap.of(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
false
);
}
},
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0);
Assert.assertTrue(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
@Test
public void testUserProvidedContextOverrideLockConfig() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask(
"t1",
Intervals.of("2021-01-01/P1D"),
ImmutableMap.of(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
false
)
);
taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0);
Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
private static class TestTask extends AbstractBatchIndexTask
{
private final Interval interval;
@ -116,7 +276,12 @@ public class TaskQueueTest extends IngestionTestBase
private TestTask(String id, Interval interval)
{
super(id, "datasource", null);
this(id, interval, null);
}
private TestTask(String id, Interval interval, Map<String, Object> context)
{
super(id, "datasource", context);
this.interval = interval;
}