Clean up TaskQueueTest (#16187)

Changes:
- Remove redundant code from `TaskQueueTest`
- Use lambdas in `TaskQueue`
- Simplify error message when `TaskQueue` is full
This commit is contained in:
Kashif Faraz 2024-03-25 09:40:01 +05:30 committed by GitHub
parent 323d67a0ac
commit 82f443340d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 152 additions and 394 deletions

View File

@ -73,7 +73,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -186,9 +185,9 @@ public class TaskQueue
} }
@VisibleForTesting @VisibleForTesting
void setActive(boolean active) void setActive()
{ {
this.active = active; this.active = true;
} }
/** /**
@ -211,31 +210,26 @@ public class TaskQueue
"Shutting down forcefully as task failed to reacquire lock while becoming leader"); "Shutting down forcefully as task failed to reacquire lock while becoming leader");
} }
managerExec.submit( managerExec.submit(
new Runnable() () -> {
{ while (true) {
@Override try {
public void run() manage();
{ break;
while (true) { }
catch (InterruptedException e) {
log.info("Interrupted, exiting!");
break;
}
catch (Exception e) {
final long restartDelay = config.getRestartDelay().getMillis();
log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
try { try {
manage(); Thread.sleep(restartDelay);
break;
} }
catch (InterruptedException e) { catch (InterruptedException e2) {
log.info("Interrupted, exiting!"); log.info("Interrupted, exiting!");
break; break;
} }
catch (Exception e) {
final long restartDelay = config.getRestartDelay().getMillis();
log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
try {
Thread.sleep(restartDelay);
}
catch (InterruptedException e2) {
log.info("Interrupted, exiting!");
break;
}
}
} }
} }
} }
@ -243,25 +237,20 @@ public class TaskQueue
ScheduledExecutors.scheduleAtFixedRate( ScheduledExecutors.scheduleAtFixedRate(
storageSyncExec, storageSyncExec,
config.getStorageSyncRate(), config.getStorageSyncRate(),
new Callable<ScheduledExecutors.Signal>() () -> {
{ try {
@Override syncFromStorage();
public ScheduledExecutors.Signal call() }
{ catch (Exception e) {
try {
syncFromStorage();
}
catch (Exception e) {
if (active) {
log.makeAlert(e, "Failed to sync with storage").emit();
}
}
if (active) { if (active) {
return ScheduledExecutors.Signal.REPEAT; log.makeAlert(e, "Failed to sync with storage").emit();
} else {
return ScheduledExecutors.Signal.STOP;
} }
} }
if (active) {
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
}
} }
); );
requestManagement(); requestManagement();
@ -530,15 +519,12 @@ public class TaskQueue
Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(task, "task");
if (tasks.size() >= config.getMaxSize()) { if (tasks.size() >= config.getMaxSize()) {
throw DruidException.forPersona(DruidException.Persona.ADMIN) throw DruidException.forPersona(DruidException.Persona.ADMIN)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED) .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build( .build(
StringUtils.format( "Task queue already contains [%d] tasks."
"Too many tasks are in the queue (Limit = %d), " + + " Retry later or increase 'druid.indexer.queue.maxSize'[%d].",
"(Current active tasks = %d). Retry later or increase the druid.indexer.queue.maxSize", tasks.size(), config.getMaxSize()
config.getMaxSize(), );
tasks.size()
)
);
} }
// If this throws with any sort of exception, including TaskExistsException, we don't want to // If this throws with any sort of exception, including TaskExistsException, we don't want to
@ -954,6 +940,10 @@ public class TaskQueue
} }
} }
/**
* Gets the current status of this task either from the {@link TaskRunner}
* or from the {@link TaskStorage} (if not available with the TaskRunner).
*/
public Optional<TaskStatus> getTaskStatus(final String taskId) public Optional<TaskStatus> getTaskStatus(final String taskId)
{ {
RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId); RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -56,7 +57,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig;
@ -71,9 +71,6 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
@ -81,58 +78,68 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public class TaskQueueTest extends IngestionTestBase public class TaskQueueTest extends IngestionTestBase
{ {
private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY; private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;
/** private TaskActionClientFactory actionClientFactory;
* This test verifies releasing all locks of a task when it is not ready to run yet. private TaskQueue taskQueue;
* private StubServiceEmitter serviceEmitter;
* This test uses 2 APIs, {@link TaskQueue} APIs and {@link IngestionTestBase} APIs private Map<String, Object> defaultTaskContext;
* to emulate the scenario of deadlock. The IngestionTestBase provides low-leve APIs
* which you can manipulate {@link TaskLockbox} manually. These APIs should be used @Override
* only to emulate a certain deadlock scenario. All normal tasks should use TaskQueue public void setUpIngestionTestBase() throws IOException
* APIs. {
*/ super.setUpIngestionTestBase();
serviceEmitter = new StubServiceEmitter();
actionClientFactory = createActionClientFactory();
defaultTaskContext = new HashMap<>();
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(3, null, null, null, null),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return defaultTaskContext;
}
},
getTaskStorage(),
new SimpleTaskRunner(),
actionClientFactory,
getLockbox(),
serviceEmitter,
getObjectMapper()
);
taskQueue.setActive();
}
@Test @Test
public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
// task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting
// to task2. // to task2.
final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
@ -166,20 +173,6 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testShutdownReleasesTaskLock() throws Exception public void testShutdownReleasesTaskLock() throws Exception
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
// Create a Task and add it to the TaskQueue // Create a Task and add it to the TaskQueue
final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M")); final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M"));
taskQueue.add(task); taskQueue.add(task);
@ -204,50 +197,27 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg()); Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg());
} }
@Test(expected = DruidException.class) @Test
public void testTaskErrorWhenExceptionIsThrownDueToQueueSize() public void testAddThrowsExceptionWhenQueueIsFull()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory(); // Fill up the queue
final TaskQueue taskQueue = new TaskQueue( for (int i = 0; i < 3; ++i) {
new TaskLockConfig(), taskQueue.add(new TestTask("t_" + i, Intervals.of("2021-01/P1M")));
new TaskQueueConfig(1, null, null, null, null), }
new DefaultTaskConfig(),
getTaskStorage(), // Verify that adding another task throws an exception
new SimpleTaskRunner(actionClientFactory), Assert.assertThrows(
actionClientFactory, DruidException.class,
getLockbox(), () -> taskQueue.add(new TestTask("tx", Intervals.of("2021-01/P1M")))
new NoopServiceEmitter(),
getObjectMapper()
); );
taskQueue.setActive(true);
// Create a Task and add it to the TaskQueue
final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M"));
taskQueue.add(task1);
// we will get exception here as taskQueue size is 1 druid.indexer.queue.maxSize is already 1
taskQueue.add(task2);
} }
@Test @Test
public void testSetUseLineageBasedSegmentAllocationByDefault() public void testAddedTaskUsesLineageBasedSegmentAllocationByDefault()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task); taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks(); final List<Task> tasks = taskQueue.getTasks();
Assert.assertEquals(1, tasks.size()); Assert.assertEquals(1, tasks.size());
final Task queuedTask = tasks.get(0); final Task queuedTask = tasks.get(0);
@ -259,29 +229,10 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory(); defaultTaskContext.put(
final TaskQueue taskQueue = new TaskQueue( SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
new TaskLockConfig(), false
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return ImmutableMap.of(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
false
);
}
},
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
); );
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task); taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks(); final List<Task> tasks = taskQueue.getTasks();
@ -295,19 +246,6 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask( final Task task = new TestTask(
"t1", "t1",
Intervals.of("2021-01-01/P1D"), Intervals.of("2021-01-01/P1D"),
@ -328,29 +266,7 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testLockConfigTakePrecedenceThanDefaultTaskContext() public void testLockConfigTakePrecedenceThanDefaultTaskContext()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory(); defaultTaskContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false);
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig()
{
@Override
public Map<String, Object> getContext()
{
return ImmutableMap.of(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
false
);
}
},
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task); taskQueue.add(task);
final List<Task> tasks = taskQueue.getTasks(); final List<Task> tasks = taskQueue.getTasks();
@ -362,19 +278,6 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testUserProvidedContextOverrideLockConfig() public void testUserProvidedContextOverrideLockConfig()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask( final Task task = new TestTask(
"t1", "t1",
Intervals.of("2021-01-01/P1D"), Intervals.of("2021-01-01/P1D"),
@ -391,21 +294,8 @@ public class TaskQueueTest extends IngestionTestBase
} }
@Test @Test
public void testTaskStatusWhenExceptionIsThrownInIsReady() public void testExceptionInIsReadyFailsTask()
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
getObjectMapper()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
{ {
@Override @Override
@ -422,7 +312,6 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
Assert.assertNotNull(statusOptional.get().getErrorMsg()); Assert.assertNotNull(statusOptional.get().getErrorMsg());
Assert.assertTrue( Assert.assertTrue(
StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()),
statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run") statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run")
); );
} }
@ -430,9 +319,9 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedException public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedException
{ {
final TaskActionClientFactory actionClientFactory = createActionClientFactory(); final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner();
final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1")); taskRunner.start();
final StubServiceEmitter metricsVerifier = new StubServiceEmitter("druid/overlord", "testHost");
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()) EasyMock.expect(workerHolder.getWorker())
.andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)) .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY))
@ -450,15 +339,11 @@ public class TaskQueueTest extends IngestionTestBase
taskRunner, taskRunner,
actionClientFactory, actionClientFactory,
getLockbox(), getLockbox(),
metricsVerifier, serviceEmitter,
getObjectMapper() getObjectMapper()
); );
taskQueue.setActive(true); taskQueue.setActive();
final Task task = new TestTask( final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
"t1",
Intervals.of("2021-01-01/P1D"),
ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false)
);
taskQueue.add(task); taskQueue.add(task);
taskQueue.manageInternal(); taskQueue.manageInternal();
@ -483,7 +368,7 @@ public class TaskQueueTest extends IngestionTestBase
Thread.sleep(100); Thread.sleep(100);
// Verify that metrics are emitted on receiving announcement // Verify that metrics are emitted on receiving announcement
metricsVerifier.verifyEmitted("task/run/time", 1); serviceEmitter.verifyEmitted("task/run/time", 1);
CoordinatorRunStats stats = taskQueue.getQueueStats(); CoordinatorRunStats stats = taskQueue.getQueueStats();
Assert.assertEquals(0L, stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE)); Assert.assertEquals(0L, stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE));
Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES)); Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
@ -492,42 +377,46 @@ public class TaskQueueTest extends IngestionTestBase
@Test @Test
public void testGetTaskStatus() public void testGetTaskStatus()
{ {
final TaskRunner taskRunner = EasyMock.createMock(TaskRunner.class);
final TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
final String newTask = "newTask"; final String newTask = "newTask";
final String waitingTask = "waitingTask";
final String pendingTask = "pendingTask";
final String runningTask = "runningTask";
final String successfulTask = "successfulTask";
final String failedTask = "failedTask";
TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
EasyMock.expect(taskStorage.getStatus(newTask))
.andReturn(Optional.of(TaskStatus.running(newTask)));
EasyMock.expect(taskStorage.getStatus(successfulTask))
.andReturn(Optional.of(TaskStatus.success(successfulTask)));
EasyMock.expect(taskStorage.getStatus(failedTask))
.andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask)));
EasyMock.replay(taskStorage);
TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class);
EasyMock.expect(taskRunner.getRunnerTaskState(newTask)) EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
.andReturn(null); .andReturn(null);
EasyMock.expect(taskStorage.getStatus(newTask))
.andReturn(Optional.of(TaskStatus.running(newTask)));
final String waitingTask = "waitingTask";
EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask)) EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
.andReturn(RunnerTaskState.WAITING); .andReturn(RunnerTaskState.WAITING);
EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
.andReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
.andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskRunner.getTaskLocation(waitingTask)) EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
.andReturn(TaskLocation.unknown()); .andReturn(TaskLocation.unknown());
final String pendingTask = "pendingTask";
EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
.andReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getTaskLocation(pendingTask)) EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
.andReturn(TaskLocation.unknown()); .andReturn(TaskLocation.unknown());
final String runningTask = "runningTask";
EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
.andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getTaskLocation(runningTask)) EasyMock.expect(taskRunner.getTaskLocation(runningTask))
.andReturn(TaskLocation.create("host", 8100, 8100)); .andReturn(TaskLocation.create("host", 8100, 8100));
EasyMock.replay(taskRunner);
final String successfulTask = "successfulTask";
EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskStorage.getStatus(successfulTask))
.andReturn(Optional.of(TaskStatus.success(successfulTask)));
final String failedTask = "failedTask";
EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskStorage.getStatus(failedTask))
.andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask)));
EasyMock.replay(taskRunner, taskStorage);
final TaskQueue taskQueue = new TaskQueue( final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(), new TaskLockConfig(),
@ -535,12 +424,12 @@ public class TaskQueueTest extends IngestionTestBase
new DefaultTaskConfig(), new DefaultTaskConfig(),
taskStorage, taskStorage,
taskRunner, taskRunner,
createActionClientFactory(), actionClientFactory,
getLockbox(), getLockbox(),
new StubServiceEmitter("druid/overlord", "testHost"), serviceEmitter,
getObjectMapper() getObjectMapper()
); );
taskQueue.setActive(true); taskQueue.setActive();
Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get()); Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get());
Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get()); Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get());
@ -635,28 +524,17 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(taskInStorageAsString, taskInQueueAsString); Assert.assertEquals(taskInStorageAsString, taskInQueueAsString);
} }
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> runningTasks) private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
{ {
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery); .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery());
EasyMock.replay(druidNodeDiscoveryProvider); EasyMock.replay(druidNodeDiscoveryProvider);
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
for (String taskId : runningTasks) { return new HttpRemoteTaskRunner(
EasyMock.expect(taskStorageMock.getStatus(taskId)).andReturn(Optional.of(TaskStatus.running(taskId))); getObjectMapper(),
} new HttpRemoteTaskRunnerConfig(),
EasyMock.replay(taskStorageMock);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 3;
}
},
EasyMock.createNiceMock(HttpClient.class), EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new NoopProvisioningStrategy<>(), new NoopProvisioningStrategy<>(),
@ -664,35 +542,8 @@ public class TaskQueueTest extends IngestionTestBase
EasyMock.createNiceMock(TaskStorage.class), EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class), EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new StubServiceEmitter("druid/overlord", "testHost") serviceEmitter
); );
taskRunner.start();
taskRunner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "test-listener";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
// do nothing
}
},
Execs.directExecutor()
);
return taskRunner;
} }
private static class TestTask extends AbstractBatchIndexTask private static class TestTask extends AbstractBatchIndexTask
@ -774,111 +625,28 @@ public class TaskQueueTest extends IngestionTestBase
} }
} }
private static class SimpleTaskRunner implements TaskRunner private class SimpleTaskRunner extends SingleTaskBackgroundRunner
{ {
private final TaskActionClientFactory actionClientFactory; SimpleTaskRunner()
private SimpleTaskRunner(TaskActionClientFactory actionClientFactory)
{
this.actionClientFactory = actionClientFactory;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return null;
}
@Override
public void start()
{
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
}
@Override
public void unregisterListener(String listenerId)
{ {
super(
EasyMock.createMock(TaskToolboxFactory.class),
null,
serviceEmitter,
new DruidNode("overlord", "localhost", false, 8091, null, true, false),
null
);
} }
@Override @Override
public ListenableFuture<TaskStatus> run(Task task) public ListenableFuture<TaskStatus> run(Task task)
{ {
try { try {
final TaskToolbox toolbox = Mockito.mock(TaskToolbox.class); return Futures.immediateFuture(task.run(null));
Mockito.when(toolbox.getTaskActionClient()).thenReturn(actionClientFactory.create(task));
return Futures.immediateFuture(task.run(toolbox));
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Override
public void shutdown(String taskid, String reason)
{
}
@Override
public void stop()
{
}
@Override
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
return null;
}
@Override
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
return null;
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
return Collections.emptyList();
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return null;
}
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
}
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
}
@Override
public Map<String, Long> getLazyTaskSlotCount()
{
return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
}
@Override
public Map<String, Long> getBlacklistedTaskSlotCount()
{
return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
}
} }
} }