From 82f443340d91cf2fe12d844e6df41482f6dddbca Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 25 Mar 2024 09:40:01 +0530 Subject: [PATCH] Clean up TaskQueueTest (#16187) Changes: - Remove redundant code from `TaskQueueTest` - Use lambdas in `TaskQueue` - Simplify error message when `TaskQueue` is full --- .../druid/indexing/overlord/TaskQueue.java | 86 ++-- .../indexing/overlord/TaskQueueTest.java | 460 +++++------------- 2 files changed, 152 insertions(+), 394 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index d3ca29d3abc..fb3f68c2394 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -73,7 +73,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -186,9 +185,9 @@ public class TaskQueue } @VisibleForTesting - void setActive(boolean active) + void setActive() { - this.active = active; + this.active = true; } /** @@ -211,31 +210,26 @@ public class TaskQueue "Shutting down forcefully as task failed to reacquire lock while becoming leader"); } managerExec.submit( - new Runnable() - { - @Override - public void run() - { - while (true) { + () -> { + while (true) { + try { + manage(); + break; + } + catch (InterruptedException e) { + log.info("Interrupted, exiting!"); + break; + } + catch (Exception e) { + final long restartDelay = config.getRestartDelay().getMillis(); + log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit(); try { - manage(); - break; + Thread.sleep(restartDelay); } - catch (InterruptedException e) { + catch (InterruptedException e2) { log.info("Interrupted, exiting!"); break; } - catch (Exception e) { - final long restartDelay = config.getRestartDelay().getMillis(); - log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit(); - try { - Thread.sleep(restartDelay); - } - catch (InterruptedException e2) { - log.info("Interrupted, exiting!"); - break; - } - } } } } @@ -243,25 +237,20 @@ public class TaskQueue ScheduledExecutors.scheduleAtFixedRate( storageSyncExec, config.getStorageSyncRate(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - try { - syncFromStorage(); - } - catch (Exception e) { - if (active) { - log.makeAlert(e, "Failed to sync with storage").emit(); - } - } + () -> { + try { + syncFromStorage(); + } + catch (Exception e) { if (active) { - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; + log.makeAlert(e, "Failed to sync with storage").emit(); } } + if (active) { + return ScheduledExecutors.Signal.REPEAT; + } else { + return ScheduledExecutors.Signal.STOP; + } } ); requestManagement(); @@ -530,15 +519,12 @@ public class TaskQueue Preconditions.checkNotNull(task, "task"); if (tasks.size() >= config.getMaxSize()) { throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) - .build( - StringUtils.format( - "Too many tasks are in the queue (Limit = %d), " + - "(Current active tasks = %d). Retry later or increase the druid.indexer.queue.maxSize", - config.getMaxSize(), - tasks.size() - ) - ); + .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) + .build( + "Task queue already contains [%d] tasks." + + " Retry later or increase 'druid.indexer.queue.maxSize'[%d].", + tasks.size(), config.getMaxSize() + ); } // If this throws with any sort of exception, including TaskExistsException, we don't want to @@ -954,6 +940,10 @@ public class TaskQueue } } + /** + * Gets the current status of this task either from the {@link TaskRunner} + * or from the {@link TaskStorage} (if not available with the TaskRunner). + */ public Optional getTaskStatus(final String taskId) { RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 6a45cbb4c79..969a08abd03 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -44,6 +44,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -56,7 +57,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngesti import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; -import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; @@ -71,9 +71,6 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.http.client.HttpClient; @@ -81,58 +78,68 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.SQLMetadataConnector; -import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import javax.annotation.Nullable; +import java.io.IOException; import java.net.URI; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; public class TaskQueueTest extends IngestionTestBase { private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY; - /** - * This test verifies releasing all locks of a task when it is not ready to run yet. - * - * This test uses 2 APIs, {@link TaskQueue} APIs and {@link IngestionTestBase} APIs - * to emulate the scenario of deadlock. The IngestionTestBase provides low-leve APIs - * which you can manipulate {@link TaskLockbox} manually. These APIs should be used - * only to emulate a certain deadlock scenario. All normal tasks should use TaskQueue - * APIs. - */ + private TaskActionClientFactory actionClientFactory; + private TaskQueue taskQueue; + private StubServiceEmitter serviceEmitter; + private Map defaultTaskContext; + + @Override + public void setUpIngestionTestBase() throws IOException + { + super.setUpIngestionTestBase(); + serviceEmitter = new StubServiceEmitter(); + actionClientFactory = createActionClientFactory(); + defaultTaskContext = new HashMap<>(); + + taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(3, null, null, null, null), + new DefaultTaskConfig() + { + @Override + public Map getContext() + { + return defaultTaskContext; + } + }, + getTaskStorage(), + new SimpleTaskRunner(), + actionClientFactory, + getLockbox(), + serviceEmitter, + getObjectMapper() + ); + taskQueue.setActive(); + } + @Test public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting // to task2. final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); @@ -166,20 +173,6 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testShutdownReleasesTaskLock() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); - // Create a Task and add it to the TaskQueue final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M")); taskQueue.add(task); @@ -204,50 +197,27 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg()); } - @Test(expected = DruidException.class) - public void testTaskErrorWhenExceptionIsThrownDueToQueueSize() + @Test + public void testAddThrowsExceptionWhenQueueIsFull() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(1, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() + // Fill up the queue + for (int i = 0; i < 3; ++i) { + taskQueue.add(new TestTask("t_" + i, Intervals.of("2021-01/P1M"))); + } + + // Verify that adding another task throws an exception + Assert.assertThrows( + DruidException.class, + () -> taskQueue.add(new TestTask("tx", Intervals.of("2021-01/P1M"))) ); - taskQueue.setActive(true); - - // Create a Task and add it to the TaskQueue - final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); - final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M")); - taskQueue.add(task1); - - // we will get exception here as taskQueue size is 1 druid.indexer.queue.maxSize is already 1 - taskQueue.add(task2); } @Test - public void testSetUseLineageBasedSegmentAllocationByDefault() + public void testAddedTaskUsesLineageBasedSegmentAllocationByDefault() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); taskQueue.add(task); + final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); final Task queuedTask = tasks.get(0); @@ -259,29 +229,10 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig() - { - @Override - public Map getContext() - { - return ImmutableMap.of( - SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, - false - ); - } - }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() + defaultTaskContext.put( + SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, + false ); - taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); taskQueue.add(task); final List tasks = taskQueue.getTasks(); @@ -295,19 +246,6 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), @@ -328,29 +266,7 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testLockConfigTakePrecedenceThanDefaultTaskContext() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig() - { - @Override - public Map getContext() - { - return ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ); - } - }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); + defaultTaskContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); taskQueue.add(task); final List tasks = taskQueue.getTasks(); @@ -362,19 +278,6 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testUserProvidedContextOverrideLockConfig() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), @@ -391,21 +294,8 @@ public class TaskQueueTest extends IngestionTestBase } @Test - public void testTaskStatusWhenExceptionIsThrownInIsReady() + public void testExceptionInIsReadyFailsTask() { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter(), - getObjectMapper() - ); - taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) { @Override @@ -422,7 +312,6 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); Assert.assertNotNull(statusOptional.get().getErrorMsg()); Assert.assertTrue( - StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()), statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run") ); } @@ -430,9 +319,9 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1")); - final StubServiceEmitter metricsVerifier = new StubServiceEmitter("druid/overlord", "testHost"); + final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(); + taskRunner.start(); + WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()) .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)) @@ -450,15 +339,11 @@ public class TaskQueueTest extends IngestionTestBase taskRunner, actionClientFactory, getLockbox(), - metricsVerifier, + serviceEmitter, getObjectMapper() ); - taskQueue.setActive(true); - final Task task = new TestTask( - "t1", - Intervals.of("2021-01-01/P1D"), - ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false) - ); + taskQueue.setActive(); + final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); taskQueue.add(task); taskQueue.manageInternal(); @@ -483,7 +368,7 @@ public class TaskQueueTest extends IngestionTestBase Thread.sleep(100); // Verify that metrics are emitted on receiving announcement - metricsVerifier.verifyEmitted("task/run/time", 1); + serviceEmitter.verifyEmitted("task/run/time", 1); CoordinatorRunStats stats = taskQueue.getQueueStats(); Assert.assertEquals(0L, stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE)); Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES)); @@ -492,42 +377,46 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testGetTaskStatus() { + final TaskRunner taskRunner = EasyMock.createMock(TaskRunner.class); + final TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class); + final String newTask = "newTask"; - final String waitingTask = "waitingTask"; - final String pendingTask = "pendingTask"; - final String runningTask = "runningTask"; - final String successfulTask = "successfulTask"; - final String failedTask = "failedTask"; - - TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class); - EasyMock.expect(taskStorage.getStatus(newTask)) - .andReturn(Optional.of(TaskStatus.running(newTask))); - EasyMock.expect(taskStorage.getStatus(successfulTask)) - .andReturn(Optional.of(TaskStatus.success(successfulTask))); - EasyMock.expect(taskStorage.getStatus(failedTask)) - .andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask))); - EasyMock.replay(taskStorage); - - TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class); EasyMock.expect(taskRunner.getRunnerTaskState(newTask)) .andReturn(null); + EasyMock.expect(taskStorage.getStatus(newTask)) + .andReturn(Optional.of(TaskStatus.running(newTask))); + + final String waitingTask = "waitingTask"; EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask)) .andReturn(RunnerTaskState.WAITING); - EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask)) - .andReturn(RunnerTaskState.PENDING); - EasyMock.expect(taskRunner.getRunnerTaskState(runningTask)) - .andReturn(RunnerTaskState.RUNNING); - EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask)) - .andReturn(RunnerTaskState.NONE); - EasyMock.expect(taskRunner.getRunnerTaskState(failedTask)) - .andReturn(RunnerTaskState.NONE); EasyMock.expect(taskRunner.getTaskLocation(waitingTask)) .andReturn(TaskLocation.unknown()); + + final String pendingTask = "pendingTask"; + EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask)) + .andReturn(RunnerTaskState.PENDING); EasyMock.expect(taskRunner.getTaskLocation(pendingTask)) .andReturn(TaskLocation.unknown()); + + final String runningTask = "runningTask"; + EasyMock.expect(taskRunner.getRunnerTaskState(runningTask)) + .andReturn(RunnerTaskState.RUNNING); EasyMock.expect(taskRunner.getTaskLocation(runningTask)) .andReturn(TaskLocation.create("host", 8100, 8100)); - EasyMock.replay(taskRunner); + + final String successfulTask = "successfulTask"; + EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask)) + .andReturn(RunnerTaskState.NONE); + EasyMock.expect(taskStorage.getStatus(successfulTask)) + .andReturn(Optional.of(TaskStatus.success(successfulTask))); + + final String failedTask = "failedTask"; + EasyMock.expect(taskRunner.getRunnerTaskState(failedTask)) + .andReturn(RunnerTaskState.NONE); + EasyMock.expect(taskStorage.getStatus(failedTask)) + .andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask))); + + EasyMock.replay(taskRunner, taskStorage); final TaskQueue taskQueue = new TaskQueue( new TaskLockConfig(), @@ -535,12 +424,12 @@ public class TaskQueueTest extends IngestionTestBase new DefaultTaskConfig(), taskStorage, taskRunner, - createActionClientFactory(), + actionClientFactory, getLockbox(), - new StubServiceEmitter("druid/overlord", "testHost"), + serviceEmitter, getObjectMapper() ); - taskQueue.setActive(true); + taskQueue.setActive(); Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get()); Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get()); @@ -635,28 +524,17 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals(taskInStorageAsString, taskInQueueAsString); } - private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTasks) + private HttpRemoteTaskRunner createHttpRemoteTaskRunner() { - HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery()); EasyMock.replay(druidNodeDiscoveryProvider); - TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class); - for (String taskId : runningTasks) { - EasyMock.expect(taskStorageMock.getStatus(taskId)).andReturn(Optional.of(TaskStatus.running(taskId))); - } - EasyMock.replay(taskStorageMock); - HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( - TestHelper.makeJsonMapper(), - new HttpRemoteTaskRunnerConfig() - { - @Override - public int getPendingTasksRunnerNumThreads() - { - return 3; - } - }, + + return new HttpRemoteTaskRunner( + getObjectMapper(), + new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(HttpClient.class), DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), new NoopProvisioningStrategy<>(), @@ -664,35 +542,8 @@ public class TaskQueueTest extends IngestionTestBase EasyMock.createNiceMock(TaskStorage.class), EasyMock.createNiceMock(CuratorFramework.class), new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), - new StubServiceEmitter("druid/overlord", "testHost") + serviceEmitter ); - - taskRunner.start(); - taskRunner.registerListener( - new TaskRunnerListener() - { - @Override - public String getListenerId() - { - return "test-listener"; - } - - @Override - public void locationChanged(String taskId, TaskLocation newLocation) - { - // do nothing - } - - @Override - public void statusChanged(String taskId, TaskStatus status) - { - // do nothing - } - }, - Execs.directExecutor() - ); - - return taskRunner; } private static class TestTask extends AbstractBatchIndexTask @@ -774,111 +625,28 @@ public class TaskQueueTest extends IngestionTestBase } } - private static class SimpleTaskRunner implements TaskRunner + private class SimpleTaskRunner extends SingleTaskBackgroundRunner { - private final TaskActionClientFactory actionClientFactory; - - private SimpleTaskRunner(TaskActionClientFactory actionClientFactory) - { - this.actionClientFactory = actionClientFactory; - } - - @Override - public List>> restore() - { - return null; - } - - @Override - public void start() - { - } - - @Override - public void registerListener(TaskRunnerListener listener, Executor executor) - { - } - - @Override - public void unregisterListener(String listenerId) + SimpleTaskRunner() { + super( + EasyMock.createMock(TaskToolboxFactory.class), + null, + serviceEmitter, + new DruidNode("overlord", "localhost", false, 8091, null, true, false), + null + ); } @Override public ListenableFuture run(Task task) { try { - final TaskToolbox toolbox = Mockito.mock(TaskToolbox.class); - Mockito.when(toolbox.getTaskActionClient()).thenReturn(actionClientFactory.create(task)); - return Futures.immediateFuture(task.run(toolbox)); + return Futures.immediateFuture(task.run(null)); } catch (Exception e) { throw new RuntimeException(e); } } - - @Override - public void shutdown(String taskid, String reason) - { - } - - @Override - public void stop() - { - } - - @Override - public Collection getRunningTasks() - { - return null; - } - - @Override - public Collection getPendingTasks() - { - return null; - } - - @Override - public Collection getKnownTasks() - { - return Collections.emptyList(); - } - - @Override - public Optional getScalingStats() - { - return null; - } - - @Override - public Map getTotalTaskSlotCount() - { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); - } - - @Override - public Map getIdleTaskSlotCount() - { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); - } - - @Override - public Map getUsedTaskSlotCount() - { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); - } - - @Override - public Map getLazyTaskSlotCount() - { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); - } - - @Override - public Map getBlacklistedTaskSlotCount() - { - return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); - } } }