From b1edf4a5b407533dced70e172ddd66fd11170783 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 19 Jul 2024 05:00:23 -0700 Subject: [PATCH] Refactor: Clean up Overlord guice dependencies (#16752) Description: Overlord guice dependencies are currently a little difficult to plug into. This was encountered while working on a separate PR where a class needed to depend on `TaskMaster.getTaskQueue()` to query some task related info but this class itself needs to be a dependency of `TaskMaster` so that it can be registered to the leader lifecycle. The approach taken here is to simply decouple the leadership lifecycle of the overlord from manipulation or querying of its state. Changes: - No functional change - Add new class `DruidOverlord` to contain leadership logic after the model of `DruidCoordinator` - The new class `DruidOverlord` should not be a dependency of any class with the exception of REST endpoint `*Resource` classes. - All classes that need to listen to leadership changes must be a dependency of `DruidOverlord` so that they can be registered to the leadership lifecycle. - Move all querying logic from `OverlordResource` to `TaskQueryTool` so that other classes can leverage this logic too (required for follow up PR). - Update tests --- .../indexing/overlord/DruidOverlord.java | 261 +++++++++++++ .../druid/indexing/overlord/TaskMaster.java | 239 +----------- .../indexing/overlord/TaskQueryTool.java | 279 +++++++++++++- .../overlord/http/OverlordRedirectInfo.java | 12 +- .../overlord/http/OverlordResource.java | 360 +++--------------- .../overlord/http/TaskStateLookup.java | 42 ++ .../overlord/OverlordBlinkLeadershipTest.java | 2 +- .../indexing/overlord/TaskLifecycleTest.java | 16 +- .../http/OverlordRedirectInfoTest.java | 38 +- .../overlord/http/OverlordResourceTest.java | 167 ++++---- .../indexing/overlord/http/OverlordTest.java | 27 +- .../org/apache/druid/cli/CliOverlord.java | 6 +- .../cli/CoordinatorOverlordRedirectInfo.java | 6 +- .../CoordinatorOverlordRedirectInfoTest.java | 53 +++ 14 files changed, 826 insertions(+), 682 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java create mode 100644 services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java new file mode 100644 index 00000000000..f99189d3589 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.inject.Inject; +import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.curator.discovery.ServiceAnnouncer; +import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.task.TaskContextEnricher; +import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Encapsulates the leadership lifecycle of the Druid Overlord service. + * No classes other than Resource endpoints should have this class as a dependency. + * To query the current state of the Overlord, use {@link TaskMaster} instead. + */ +public class DruidOverlord +{ + private static final EmittingLogger log = new EmittingLogger(DruidOverlord.class); + + private final DruidLeaderSelector overlordLeaderSelector; + private final DruidLeaderSelector.Listener leadershipListener; + + private final ReentrantLock giant = new ReentrantLock(true); + + private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); + + /** + * Indicates that all services have been started and the node can now announce + * itself with {@link ServiceAnnouncer#announce}. This must be set to false + * as soon as {@link DruidLeaderSelector.Listener#stopBeingLeader()} is + * called. + */ + private volatile boolean initialized; + + @Inject + public DruidOverlord( + final TaskMaster taskMaster, + final TaskLockConfig taskLockConfig, + final TaskQueueConfig taskQueueConfig, + final DefaultTaskConfig defaultTaskConfig, + final TaskLockbox taskLockbox, + final TaskStorage taskStorage, + final TaskActionClientFactory taskActionClientFactory, + @Self final DruidNode selfNode, + final TaskRunnerFactory runnerFactory, + final ServiceAnnouncer serviceAnnouncer, + final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, + final ServiceEmitter emitter, + final SupervisorManager supervisorManager, + final OverlordDutyExecutor overlordDutyExecutor, + @IndexingService final DruidLeaderSelector overlordLeaderSelector, + final SegmentAllocationQueue segmentAllocationQueue, + final ObjectMapper mapper, + final TaskContextEnricher taskContextEnricher + ) + { + this.overlordLeaderSelector = overlordLeaderSelector; + + final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : + selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); + + this.leadershipListener = new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + giant.lock(); + + // I AM THE MASTER OF THE UNIVERSE. + log.info("By the power of Grayskull, I have the power!"); + + try { + final TaskRunner taskRunner = runnerFactory.build(); + final TaskQueue taskQueue = new TaskQueue( + taskLockConfig, + taskQueueConfig, + defaultTaskConfig, + taskStorage, + taskRunner, + taskActionClientFactory, + taskLockbox, + emitter, + mapper, + taskContextEnricher + ); + + // Sensible order to start stuff: + final Lifecycle leaderLifecycle = new Lifecycle("task-master"); + if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { + log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") + .emit(); + } + + leaderLifecycle.addManagedInstance(taskRunner); + leaderLifecycle.addManagedInstance(taskQueue); + leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordDutyExecutor); + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() + { + segmentAllocationQueue.becomeLeader(); + taskMaster.becomeLeader(taskRunner, taskQueue); + + // Announce the node only after all the services have been initialized + initialized = true; + serviceAnnouncer.announce(node); + } + + @Override + public void stop() + { + serviceAnnouncer.unannounce(node); + taskMaster.stopBeingLeader(); + segmentAllocationQueue.stopBeingLeader(); + } + } + ); + + leaderLifecycle.start(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + giant.unlock(); + } + } + + @Override + public void stopBeingLeader() + { + giant.lock(); + try { + initialized = false; + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + + if (leaderLifecycle != null) { + leaderLifecycle.stop(); + } + } + finally { + giant.unlock(); + } + } + }; + } + + /** + * Starts waiting for leadership. + * Should be called only once throughout the life of the service. + */ + @LifecycleStart + public void start() + { + giant.lock(); + + try { + overlordLeaderSelector.registerListener(leadershipListener); + } + finally { + giant.unlock(); + } + } + + /** + * Stops forever (not just this particular leadership session). + * Should be called only once throughout the life of the service. + */ + @LifecycleStop + public void stop() + { + giant.lock(); + + try { + gracefulStopLeaderLifecycle(); + overlordLeaderSelector.unregisterListener(); + } + finally { + giant.unlock(); + } + } + + /** + * @return true if it's the leader and all its services have been initialized. + */ + public boolean isLeader() + { + return overlordLeaderSelector.isLeader() && initialized; + } + + public String getCurrentLeader() + { + return overlordLeaderSelector.getCurrentLeader(); + } + + public Optional getRedirectLocation() + { + String leader = overlordLeaderSelector.getCurrentLeader(); + // do not redirect when + // leader is not elected + // leader is the current node + if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) { + return Optional.absent(); + } else { + return Optional.of(leader); + } + } + + private void gracefulStopLeaderLifecycle() + { + try { + if (isLeader()) { + leadershipListener.stopBeingLeader(); + } + } + catch (Exception ex) { + // fail silently since we are stopping anyway + } + } + +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 5103f9bd87e..1de95352504 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -19,254 +19,61 @@ package org.apache.druid.indexing.overlord; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.inject.Inject; -import org.apache.druid.client.indexing.IndexingService; -import org.apache.druid.curator.discovery.ServiceAnnouncer; -import org.apache.druid.discovery.DruidLeaderSelector; -import org.apache.druid.discovery.DruidLeaderSelector.Listener; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskContextEnricher; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; -import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; -import org.apache.druid.indexing.overlord.config.TaskLockConfig; -import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.java.util.common.lifecycle.Lifecycle; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.metrics.TaskSlotCountStatsProvider; import javax.annotation.Nullable; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; /** - * Encapsulates the indexer leadership lifecycle. + * Encapsulates various Overlord classes that allow querying and updating the + * current state of the Overlord leader. */ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider { - private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); - - private final DruidLeaderSelector overlordLeaderSelector; - private final DruidLeaderSelector.Listener leadershipListener; - - private final ReentrantLock giant = new ReentrantLock(true); private final TaskActionClientFactory taskActionClientFactory; private final SupervisorManager supervisorManager; - - private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); - private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; - /** - * This flag indicates that all services has been started and should be true before calling - * {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is - * called. - */ - private volatile boolean initialized; + private final AtomicBoolean isLeader = new AtomicBoolean(false); @Inject public TaskMaster( - final TaskLockConfig taskLockConfig, - final TaskQueueConfig taskQueueConfig, - final DefaultTaskConfig defaultTaskConfig, - final TaskLockbox taskLockbox, - final TaskStorage taskStorage, - final TaskActionClientFactory taskActionClientFactory, - @Self final DruidNode selfNode, - final TaskRunnerFactory runnerFactory, - final ServiceAnnouncer serviceAnnouncer, - final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, - final ServiceEmitter emitter, - final SupervisorManager supervisorManager, - final OverlordDutyExecutor overlordDutyExecutor, - @IndexingService final DruidLeaderSelector overlordLeaderSelector, - final SegmentAllocationQueue segmentAllocationQueue, - final ObjectMapper mapper, - final TaskContextEnricher taskContextEnricher + TaskActionClientFactory taskActionClientFactory, + SupervisorManager supervisorManager ) { - this.supervisorManager = supervisorManager; this.taskActionClientFactory = taskActionClientFactory; - - this.overlordLeaderSelector = overlordLeaderSelector; - - final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : - selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); - - this.leadershipListener = new DruidLeaderSelector.Listener() - { - @Override - public void becomeLeader() - { - giant.lock(); - - // I AM THE MASTER OF THE UNIVERSE. - log.info("By the power of Grayskull, I have the power!"); - - try { - taskRunner = runnerFactory.build(); - taskQueue = new TaskQueue( - taskLockConfig, - taskQueueConfig, - defaultTaskConfig, - taskStorage, - taskRunner, - taskActionClientFactory, - taskLockbox, - emitter, - mapper, - taskContextEnricher - ); - - // Sensible order to start stuff: - final Lifecycle leaderLifecycle = new Lifecycle("task-master"); - if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { - log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") - .emit(); - } - - leaderLifecycle.addManagedInstance(taskRunner); - leaderLifecycle.addManagedInstance(taskQueue); - leaderLifecycle.addManagedInstance(supervisorManager); - leaderLifecycle.addManagedInstance(overlordDutyExecutor); - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() - { - segmentAllocationQueue.becomeLeader(); - } - - @Override - public void stop() - { - segmentAllocationQueue.stopBeingLeader(); - } - } - ); - - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() - { - initialized = true; - serviceAnnouncer.announce(node); - } - - @Override - public void stop() - { - serviceAnnouncer.unannounce(node); - } - } - ); - - leaderLifecycle.start(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - giant.unlock(); - } - } - - @Override - public void stopBeingLeader() - { - giant.lock(); - try { - initialized = false; - final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); - - if (leaderLifecycle != null) { - leaderLifecycle.stop(); - } - } - finally { - giant.unlock(); - } - } - }; + this.supervisorManager = supervisorManager; } - /** - * Starts waiting for leadership. Should only be called once throughout the life of the program. - */ - @LifecycleStart - public void start() + public void becomeLeader(TaskRunner taskRunner, TaskQueue taskQueue) { - giant.lock(); - - try { - overlordLeaderSelector.registerListener(leadershipListener); - } - finally { - giant.unlock(); - } + this.taskRunner = taskRunner; + this.taskQueue = taskQueue; + isLeader.set(true); } - /** - * Stops forever (not just this particular leadership session). Should only be called once throughout the life of - * the program. - */ - @LifecycleStop - public void stop() + public void stopBeingLeader() { - giant.lock(); - - try { - gracefulStopLeaderLifecycle(); - overlordLeaderSelector.unregisterListener(); - } - finally { - giant.unlock(); - } + isLeader.set(false); + this.taskQueue = null; + this.taskRunner = null; } - /** - * Returns true if it's the leader and all its services have been initialized. - */ - public boolean isLeader() + private boolean isLeader() { - return overlordLeaderSelector.isLeader() && initialized; - } - - public String getCurrentLeader() - { - return overlordLeaderSelector.getCurrentLeader(); - } - - public Optional getRedirectLocation() - { - String leader = overlordLeaderSelector.getCurrentLeader(); - // do not redirect when - // leader is not elected - // leader is the current node - if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) { - return Optional.absent(); - } else { - return Optional.of(leader); - } + return isLeader.get(); } public Optional getTaskRunner() @@ -380,18 +187,6 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } } - private void gracefulStopLeaderLifecycle() - { - try { - if (isLeader()) { - leadershipListener.stopBeingLeader(); - } - } - catch (Exception ex) { - // fail silently since we are stopping anyway - } - } - @Override @Nullable public Map getTotalTaskSlotCount() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 29ca16f5aa9..f5351d7c6e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -20,19 +20,37 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import org.apache.druid.indexing.overlord.http.TaskStateLookup; +import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Provides read-only methods to fetch information related to tasks. @@ -42,16 +60,28 @@ import java.util.Map; */ public class TaskQueryTool { + private static final Logger log = new Logger(TaskQueryTool.class); + private final TaskStorage storage; private final TaskLockbox taskLockbox; private final TaskMaster taskMaster; + private final JacksonConfigManager configManager; + private final ProvisioningStrategy provisioningStrategy; @Inject - public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) + public TaskQueryTool( + TaskStorage storage, + TaskLockbox taskLockbox, + TaskMaster taskMaster, + ProvisioningStrategy provisioningStrategy, + JacksonConfigManager configManager + ) { this.storage = storage; this.taskLockbox = taskLockbox; this.taskMaster = taskMaster; + this.configManager = configManager; + this.provisioningStrategy = provisioningStrategy; } /** @@ -81,13 +111,10 @@ public class TaskQueryTool public List> getActiveTaskInfo(@Nullable String dataSource) { - return storage.getTaskInfos( - TaskLookup.activeTasksOnly(), - dataSource - ); + return storage.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource); } - public List getTaskStatusPlusList( + private List getTaskStatusPlusList( Map taskLookups, @Nullable String dataSource ) @@ -107,9 +134,14 @@ public class TaskQueryTool return storage.getTask(taskId); } - public Optional getStatus(final String taskId) + public Optional getTaskStatus(final String taskId) { - return storage.getStatus(taskId); + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getTaskStatus(taskId); + } else { + return storage.getStatus(taskId); + } } @Nullable @@ -118,4 +150,235 @@ public class TaskQueryTool return storage.getTaskInfo(taskId); } + public List getTaskStatusPlusList( + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String createdTimeInterval, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + Optional taskRunnerOptional = taskMaster.getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + return Collections.emptyList(); + } + final TaskRunner taskRunner = taskRunnerOptional.get(); + + final Duration createdTimeDuration; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); + } else { + createdTimeDuration = null; + } + + // Ideally, snapshotting in taskStorage and taskRunner should be done atomically, + // but there is no way to do it today. + // Instead, we first gets a snapshot from taskStorage and then one from taskRunner. + // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process + // and use the snapshot from taskRunner as a reference for potential task state updates happened + // after the first snapshotting. + Stream taskStatusPlusStream = getTaskStatusPlusList( + state, + dataSource, + createdTimeDuration, + maxCompletedTasks, + type + ); + final Map runnerWorkItems = getTaskRunnerWorkItems( + taskRunner, + state, + dataSource, + type + ); + + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { + // We are interested in only those tasks which are in taskRunner. + taskStatusPlusStream = taskStatusPlusStream + .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); + } + final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); + + // Separate complete and active tasks from taskStorage. + // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. + final List completeTaskStatusPlusList = new ArrayList<>(); + final List activeTaskStatusPlusList = new ArrayList<>(); + for (TaskStatusPlus statusPlus : taskStatusPlusList) { + if (statusPlus.getStatusCode().isComplete()) { + completeTaskStatusPlusList.add(statusPlus); + } else { + activeTaskStatusPlusList.add(statusPlus); + } + } + + final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); + + activeTaskStatusPlusList.forEach(statusPlus -> { + final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); + if (runnerWorkItem == null) { + // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. + if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { + taskStatuses.add(statusPlus); + } + } else { + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { + taskStatuses.add( + new TaskStatusPlus( + statusPlus.getId(), + statusPlus.getGroupId(), + statusPlus.getType(), + statusPlus.getCreatedTime(), + runnerWorkItem.getQueueInsertionTime(), + statusPlus.getStatusCode(), + taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner + statusPlus.getDuration(), + runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. + statusPlus.getDataSource(), + statusPlus.getErrorMsg() + ) + ); + } + } + }); + + return taskStatuses; + } + + private Stream getTaskStatusPlusList( + TaskStateLookup state, + @Nullable String dataSource, + Duration createdTimeDuration, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + final Map taskLookups; + switch (state) { + case ALL: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + TaskLookup.ActiveTaskLookup.getInstance(), + TaskLookupType.COMPLETE, + TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case COMPLETE: + taskLookups = ImmutableMap.of( + TaskLookupType.COMPLETE, + TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case WAITING: + case PENDING: + case RUNNING: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + TaskLookup.ActiveTaskLookup.getInstance() + ); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + + final Stream taskStatusPlusStream = getTaskStatusPlusList( + taskLookups, + dataSource + ).stream(); + if (type != null) { + return taskStatusPlusStream.filter( + statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) + ); + } else { + return taskStatusPlusStream; + } + } + + private Map getTaskRunnerWorkItems( + TaskRunner taskRunner, + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String type + ) + { + Stream runnerWorkItemsStream; + switch (state) { + case ALL: + case WAITING: + // waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner) + runnerWorkItemsStream = taskRunner.getKnownTasks().stream(); + break; + case PENDING: + runnerWorkItemsStream = taskRunner.getPendingTasks().stream(); + break; + case RUNNING: + runnerWorkItemsStream = taskRunner.getRunningTasks().stream(); + break; + case COMPLETE: + runnerWorkItemsStream = Stream.empty(); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + if (dataSource != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource())); + } + if (type != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType())); + } + return runnerWorkItemsStream + .collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item)); + } + + public TotalWorkerCapacityResponse getTotalWorkerCapacity() + { + Optional taskRunnerOptional = taskMaster.getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + return null; + } + TaskRunner taskRunner = taskRunnerOptional.get(); + + Collection workers = taskRunner instanceof WorkerTaskRunner ? + ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); + + int currentCapacity = taskRunner.getTotalCapacity(); + int usedCapacity = taskRunner.getUsedCapacity(); + // Calculate maximum capacity with auto scale + int maximumCapacity; + WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } else { + // Auto-scale is not using DefaultWorkerBehaviorConfig + log.debug( + "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", + workerBehaviorConfig, + workerBehaviorConfig.getClass().getSimpleName() + ); + maximumCapacity = -1; + } + + return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity); + } + + public WorkerBehaviorConfig getLatestWorkerConfig() + { + return configManager.watch( + WorkerBehaviorConfig.CONFIG_KEY, + WorkerBehaviorConfig.class + ).get(); + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java index 4e332f599df..41e1ef8b7c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java @@ -22,7 +22,7 @@ package org.apache.druid.indexing.overlord.http; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.RedirectInfo; @@ -38,25 +38,25 @@ public class OverlordRedirectInfo implements RedirectInfo "/druid/indexer/v1/isLeader" ); - private final TaskMaster taskMaster; + private final DruidOverlord overlord; @Inject - public OverlordRedirectInfo(TaskMaster taskMaster) + public OverlordRedirectInfo(DruidOverlord overlord) { - this.taskMaster = taskMaster; + this.overlord = overlord; } @Override public boolean doLocal(String requestURI) { - return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || taskMaster.isLeader(); + return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || overlord.isLeader(); } @Override public URL getRedirectURL(String queryString, String requestURI) { try { - final Optional redirectLocation = taskMaster.getRedirectLocation(); + final Optional redirectLocation = overlord.getRedirectLocation(); if (!redirectLocation.isPresent()) { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f945cc95c1e..54ada7cb2b4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -33,6 +33,7 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; @@ -44,31 +45,23 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; -import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; -import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLookup; -import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; -import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; -import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -84,7 +77,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.joda.time.Duration; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -104,17 +97,12 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * @@ -125,6 +113,7 @@ public class OverlordResource private static final Logger log = new Logger(OverlordResource.class); private final TaskMaster taskMaster; + private final DruidOverlord overlord; private final TaskQueryTool taskQueryTool; private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private final TaskLogStreamer taskLogStreamer; @@ -132,35 +121,16 @@ public class OverlordResource private final AuditManager auditManager; private final AuthorizerMapper authorizerMapper; private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; - private final ProvisioningStrategy provisioningStrategy; private final AuthConfig authConfig; - private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); private static final Set AUDITED_TASK_TYPES = ImmutableSet.of("index", "index_parallel", "compact", "index_hadoop", "kill"); - private enum TaskStateLookup - { - ALL, - WAITING, - PENDING, - RUNNING, - COMPLETE; - - private static TaskStateLookup fromString(@Nullable String state) - { - if (state == null) { - return ALL; - } else { - return TaskStateLookup.valueOf(StringUtils.toUpperCase(state)); - } - } - } - @Inject public OverlordResource( + DruidOverlord overlord, TaskMaster taskMaster, TaskQueryTool taskQueryTool, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, @@ -169,10 +139,10 @@ public class OverlordResource AuditManager auditManager, AuthorizerMapper authorizerMapper, WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, - ProvisioningStrategy provisioningStrategy, AuthConfig authConfig ) { + this.overlord = overlord; this.taskMaster = taskMaster; this.taskQueryTool = taskQueryTool; this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter; @@ -181,7 +151,6 @@ public class OverlordResource this.auditManager = auditManager; this.authorizerMapper = authorizerMapper; this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter; - this.provisioningStrategy = provisioningStrategy; this.authConfig = authConfig; } @@ -252,7 +221,7 @@ public class OverlordResource @Produces(MediaType.APPLICATION_JSON) public Response getLeader() { - return Response.ok(taskMaster.getCurrentLeader()).build(); + return Response.ok(overlord.getCurrentLeader()).build(); } /** @@ -263,7 +232,7 @@ public class OverlordResource @Produces(MediaType.APPLICATION_JSON) public Response isLeader() { - final boolean leading = taskMaster.isLeader(); + final boolean leading = overlord.isLeader(); final Map response = ImmutableMap.of("leader", leading); if (leading) { return Response.ok(response).build(); @@ -373,9 +342,7 @@ public class OverlordResource taskInfo.getStatus().getStatusCode(), RunnerTaskState.WAITING, taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation() == null - ? TaskLocation.unknown() - : taskInfo.getStatus().getLocation(), + Configs.valueOrDefault(taskInfo.getStatus().getLocation(), TaskLocation.unknown()), taskInfo.getDataSource(), taskInfo.getStatus().getErrorMsg() ) @@ -415,14 +382,9 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - taskQueue.shutdown(taskid, "Shutdown request from user"); - return Response.ok(ImmutableMap.of("task", taskid)).build(); - } + taskQueue -> { + taskQueue.shutdown(taskid, "Shutdown request from user"); + return Response.ok(ImmutableMap.of("task", taskid)).build(); } ); } @@ -435,20 +397,15 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - final List> tasks = taskQueryTool.getActiveTaskInfo(dataSource); - if (tasks.isEmpty()) { - return Response.status(Status.NOT_FOUND).build(); - } else { - for (final TaskInfo task : tasks) { - taskQueue.shutdown(task.getId(), "Shutdown request from user"); - } - return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); + taskQueue -> { + final List> tasks = taskQueryTool.getActiveTaskInfo(dataSource); + if (tasks.isEmpty()) { + return Response.status(Status.NOT_FOUND).build(); + } else { + for (final TaskInfo task : tasks) { + taskQueue.shutdown(task.getId(), "Shutdown request from user"); } + return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); } } ); @@ -460,19 +417,13 @@ public class OverlordResource @ResourceFilters(StateResourceFilter.class) public Response getMultipleTaskStatuses(Set taskIds) { - if (taskIds == null || taskIds.size() == 0) { - return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build(); + if (CollectionUtils.isNullOrEmpty(taskIds)) { + return Response.status(Response.Status.BAD_REQUEST).entity("No Task IDs provided.").build(); } - final Optional taskQueue = taskMaster.getTaskQueue(); - Map result = Maps.newHashMapWithExpectedSize(taskIds.size()); + final Map result = Maps.newHashMapWithExpectedSize(taskIds.size()); for (String taskId : taskIds) { - final Optional optional; - if (taskQueue.isPresent()) { - optional = taskQueue.get().getTaskStatus(taskId); - } else { - optional = taskQueryTool.getStatus(taskId); - } + final Optional optional = taskQueryTool.getTaskStatus(taskId); if (optional.isPresent()) { result.put(taskId, optional.get()); } @@ -487,11 +438,7 @@ public class OverlordResource @ResourceFilters(ConfigResourceFilter.class) public Response getWorkerConfig() { - if (workerConfigRef == null) { - workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class); - } - - return Response.ok(workerConfigRef.get()).build(); + return Response.ok(taskQueryTool.getLatestWorkerConfig()).build(); } /** @@ -503,49 +450,11 @@ public class OverlordResource @ResourceFilters(ConfigResourceFilter.class) public Response getTotalWorkerCapacity() { - // Calculate current cluster capacity - Optional taskRunnerOptional = taskMaster.getTaskRunner(); - if (!taskRunnerOptional.isPresent()) { - // Cannot serve call as not leader + if (overlord.isLeader()) { + return Response.ok(taskQueryTool.getTotalWorkerCapacity()).build(); + } else { return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } - TaskRunner taskRunner = taskRunnerOptional.get(); - Collection workers = taskRunner instanceof WorkerTaskRunner ? - ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); - - int currentCapacity = taskRunner.getTotalCapacity(); - int usedCapacity = taskRunner.getUsedCapacity(); - // Calculate maximum capacity with auto scale - int maximumCapacity; - if (workerConfigRef == null) { - workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class); - } - WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); - if (workerBehaviorConfig == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); - maximumCapacity = -1; - } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { - DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; - if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); - maximumCapacity = -1; - } else { - int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); - int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); - maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; - } - } else { - // Auto scale is not using DefaultWorkerBehaviorConfig - log.debug( - "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", - workerBehaviorConfig, - workerBehaviorConfig.getClass().getSimpleName() - ); - maximumCapacity = -1; - } - return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build(); } // default value is used for backwards compatibility @@ -693,9 +602,13 @@ public class OverlordResource //check for valid state if (state != null) { if (!API_TASK_STATES.contains(StringUtils.toLowerCase(state))) { + String errorMessage = StringUtils.format( + "Invalid task state[%s]. Must be one of %s.", + state, API_TASK_STATES + ); return Response.status(Status.BAD_REQUEST) .type(MediaType.TEXT_PLAIN) - .entity(StringUtils.format("Invalid state : %s, valid values are: %s", state, API_TASK_STATES)) + .entity(errorMessage) .build(); } } @@ -725,8 +638,7 @@ public class OverlordResource taskMaster.getTaskRunner(), taskRunner -> { final List authorizedList = securedTaskStatusPlus( - getTaskStatusPlusList( - taskRunner, + taskQueryTool.getTaskStatusPlusList( TaskStateLookup.fromString(state), dataSource, createdTimeInterval, @@ -741,180 +653,6 @@ public class OverlordResource ); } - private List getTaskStatusPlusList( - TaskRunner taskRunner, - TaskStateLookup state, - @Nullable String dataSource, - @Nullable String createdTimeInterval, - @Nullable Integer maxCompletedTasks, - @Nullable String type - ) - { - final Duration createdTimeDuration; - if (createdTimeInterval != null) { - final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); - createdTimeDuration = theInterval.toDuration(); - } else { - createdTimeDuration = null; - } - - // Ideally, snapshotting in taskStorage and taskRunner should be done atomically, - // but there is no way to do it today. - // Instead, we first gets a snapshot from taskStorage and then one from taskRunner. - // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process - // and use the snapshot from taskRunner as a reference for potential task state updates happened - // after the first snapshotting. - Stream taskStatusPlusStream = getTaskStatusPlusList( - state, - dataSource, - createdTimeDuration, - maxCompletedTasks, - type - ); - final Map runnerWorkItems = getTaskRunnerWorkItems( - taskRunner, - state, - dataSource, - type - ); - - if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { - // We are interested in only those tasks which are in taskRunner. - taskStatusPlusStream = taskStatusPlusStream - .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); - } - final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); - - // Separate complete and active tasks from taskStorage. - // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. - final List completeTaskStatusPlusList = new ArrayList<>(); - final List activeTaskStatusPlusList = new ArrayList<>(); - for (TaskStatusPlus statusPlus : taskStatusPlusList) { - if (statusPlus.getStatusCode().isComplete()) { - completeTaskStatusPlusList.add(statusPlus); - } else { - activeTaskStatusPlusList.add(statusPlus); - } - } - - final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); - - activeTaskStatusPlusList.forEach(statusPlus -> { - final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); - if (runnerWorkItem == null) { - // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. - if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { - taskStatuses.add(statusPlus); - } - } else { - if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { - taskStatuses.add( - new TaskStatusPlus( - statusPlus.getId(), - statusPlus.getGroupId(), - statusPlus.getType(), - statusPlus.getCreatedTime(), - runnerWorkItem.getQueueInsertionTime(), - statusPlus.getStatusCode(), - taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner - statusPlus.getDuration(), - runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. - statusPlus.getDataSource(), - statusPlus.getErrorMsg() - ) - ); - } - } - }); - - return taskStatuses; - } - - private Stream getTaskStatusPlusList( - TaskStateLookup state, - @Nullable String dataSource, - Duration createdTimeDuration, - @Nullable Integer maxCompletedTasks, - @Nullable String type - ) - { - final Map taskLookups; - switch (state) { - case ALL: - taskLookups = ImmutableMap.of( - TaskLookupType.ACTIVE, - ActiveTaskLookup.getInstance(), - TaskLookupType.COMPLETE, - CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) - ); - break; - case COMPLETE: - taskLookups = ImmutableMap.of( - TaskLookupType.COMPLETE, - CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) - ); - break; - case WAITING: - case PENDING: - case RUNNING: - taskLookups = ImmutableMap.of( - TaskLookupType.ACTIVE, - ActiveTaskLookup.getInstance() - ); - break; - default: - throw new IAE("Unknown state: [%s]", state); - } - - final Stream taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList( - taskLookups, - dataSource - ).stream(); - if (type != null) { - return taskStatusPlusStream.filter( - statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) - ); - } else { - return taskStatusPlusStream; - } - } - - private Map getTaskRunnerWorkItems( - TaskRunner taskRunner, - TaskStateLookup state, - @Nullable String dataSource, - @Nullable String type - ) - { - Stream runnerWorkItemsStream; - switch (state) { - case ALL: - case WAITING: - // waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner) - runnerWorkItemsStream = taskRunner.getKnownTasks().stream(); - break; - case PENDING: - runnerWorkItemsStream = taskRunner.getPendingTasks().stream(); - break; - case RUNNING: - runnerWorkItemsStream = taskRunner.getRunningTasks().stream(); - break; - case COMPLETE: - runnerWorkItemsStream = Stream.empty(); - break; - default: - throw new IAE("Unknown state: [%s]", state); - } - if (dataSource != null) { - runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource())); - } - if (type != null) { - runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType())); - } - return runnerWorkItemsStream - .collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item)); - } - @DELETE @Path("/pendingSegments/{dataSource}") @Produces(MediaType.APPLICATION_JSON) @@ -939,7 +677,7 @@ public class OverlordResource throw new ForbiddenException(authResult.getMessage()); } - if (taskMaster.isLeader()) { + if (overlord.isLeader()) { try { final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval); return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build(); @@ -970,23 +708,17 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - if (taskRunner instanceof WorkerTaskRunner) { - return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); - } else { - log.debug( - "Task runner [%s] of type [%s] does not support listing workers", - taskRunner, - taskRunner.getClass().getName() - ); - return Response.serverError() - .entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) - .build(); - } + taskRunner -> { + if (taskRunner instanceof WorkerTaskRunner) { + return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); + } else { + log.debug( + "Task runner[%s] of type[%s] does not support listing workers", + taskRunner, taskRunner.getClass().getName() + ); + return Response.serverError() + .entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) + .build(); } } ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java new file mode 100644 index 00000000000..0236afdd057 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http; + +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; + +public enum TaskStateLookup +{ + ALL, + WAITING, + PENDING, + RUNNING, + COMPLETE; + + public static TaskStateLookup fromString(@Nullable String state) + { + if (state == null) { + return ALL; + } else { + return TaskStateLookup.valueOf(StringUtils.toUpperCase(state)); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java index 7924dcb6e32..3824cc20f85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java @@ -65,7 +65,7 @@ public class OverlordBlinkLeadershipTest /** * Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link - * TaskMaster#stop()}), then creating a new taskRunner from {@link + * TaskMaster#stopBeingLeader()}), then creating a new taskRunner from {@link * org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in * {@link TaskMaster} and start it again. */ diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 7b1209e7929..15eb216d0bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -420,17 +420,11 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest // For creating a customized TaskQueue see testRealtimeIndexTaskFailure test taskStorage = setUpTaskStorage(); - handoffNotifierFactory = setUpSegmentHandOffNotifierFactory(); - dataSegmentPusher = setUpDataSegmentPusher(); - mdc = setUpMetadataStorageCoordinator(); - tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); - taskRunner = setUpThreadPoolTaskRunner(tb); - taskQueue = setUpTaskQueue(taskStorage, taskRunner); } @@ -477,7 +471,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); - tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); + tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null); return taskStorage; } @@ -738,7 +732,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest null ); - final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); + final Optional preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); final TaskStatus mergedStatus = runTask(indexTask); @@ -1235,7 +1229,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest taskQueue.start(); taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId())); - while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { + while (tsqa.getTaskStatus(indexTask.getId()).get().isRunnable()) { if (System.currentTimeMillis() > startTime + 10 * 1000) { throw new ISE("Where did the task go?!: %s", indexTask.getId()); } @@ -1319,7 +1313,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest null ); - final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); + final Optional preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); final TaskStatus mergedStatus = runTask(indexTask); @@ -1409,7 +1403,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest TaskStatus retVal = null; try { TaskStatus status; - while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { + while ((status = tsqa.getTaskStatus(taskId).get()).isRunnable()) { if (taskRunDuration.millisElapsed() > 10_000) { throw new ISE("Where did the task go?!: %s", task.getId()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java index 46e03206f49..f9a0105c067 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java @@ -20,7 +20,7 @@ package org.apache.druid.indexing.overlord.http; import com.google.common.base.Optional; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -32,48 +32,48 @@ import java.net.URLEncoder; public class OverlordRedirectInfoTest { - private TaskMaster taskMaster; + private DruidOverlord overlord; private OverlordRedirectInfo redirectInfo; @Before public void setUp() { - taskMaster = EasyMock.createMock(TaskMaster.class); - redirectInfo = new OverlordRedirectInfo(taskMaster); + overlord = EasyMock.createMock(DruidOverlord.class); + redirectInfo = new OverlordRedirectInfo(overlord); } @Test public void testDoLocalWhenLeading() { - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); + EasyMock.replay(overlord); Assert.assertTrue(redirectInfo.doLocal(null)); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path")); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test public void testDoLocalWhenNotLeading() { - EasyMock.expect(taskMaster.isLeader()).andReturn(false).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.isLeader()).andReturn(false).anyTimes(); + EasyMock.replay(overlord); Assert.assertFalse(redirectInfo.doLocal(null)); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertFalse(redirectInfo.doLocal("/druid/indexer/v1/other/path")); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test public void testGetRedirectURLWithEmptyLocation() { - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.absent()).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL("query", "/request"); Assert.assertNull(url); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test @@ -82,11 +82,11 @@ public class OverlordRedirectInfoTest String host = "http://localhost"; String query = "foo=bar&x=y"; String request = "/request"; - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL(query, request); Assert.assertEquals("http://localhost/request?foo=bar&x=y", url.toString()); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test @@ -98,14 +98,14 @@ public class OverlordRedirectInfoTest "UTF-8" ) + "/status"; - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL(null, request); Assert.assertEquals( "http://localhost/druid/indexer/v1/task/index_hadoop_datasource_2017-07-12T07%3A43%3A01.495Z/status", url.toString() ); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 4f2c3a38794..687d1deb7b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -39,13 +39,16 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; @@ -58,6 +61,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -102,7 +106,10 @@ import java.util.concurrent.atomic.AtomicReference; public class OverlordResourceTest { private OverlordResource overlordResource; + private DruidOverlord overlord; private TaskMaster taskMaster; + private TaskStorage taskStorage; + private TaskLockbox taskLockbox; private JacksonConfigManager configManager; private ProvisioningStrategy provisioningStrategy; private AuthConfig authConfig; @@ -121,12 +128,21 @@ public class OverlordResourceTest public void setUp() { taskRunner = EasyMock.createMock(TaskRunner.class); - taskQueue = EasyMock.createMock(TaskQueue.class); + taskQueue = EasyMock.createStrictMock(TaskQueue.class); configManager = EasyMock.createMock(JacksonConfigManager.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); + overlord = EasyMock.createStrictMock(DruidOverlord.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); - taskQueryTool = EasyMock.createStrictMock(TaskQueryTool.class); + taskStorage = EasyMock.createStrictMock(TaskStorage.class); + taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); + taskQueryTool = new TaskQueryTool( + taskStorage, + taskLockbox, + taskMaster, + provisioningStrategy, + configManager + ); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); req = EasyMock.createStrictMock(HttpServletRequest.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); @@ -170,6 +186,7 @@ public class OverlordResourceTest }; overlordResource = new OverlordResource( + overlord, taskMaster, taskQueryTool, indexerMetadataStorageAdapter, @@ -178,7 +195,6 @@ public class OverlordResourceTest auditManager, authMapper, workerTaskRunnerQueryAdapter, - provisioningStrategy, authConfig ); } @@ -189,7 +205,8 @@ public class OverlordResourceTest EasyMock.verify( taskRunner, taskMaster, - taskQueryTool, + taskStorage, + taskLockbox, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -200,9 +217,12 @@ public class OverlordResourceTest private void replayAll() { EasyMock.replay( + overlord, taskRunner, + taskQueue, taskMaster, - taskQueryTool, + taskStorage, + taskLockbox, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -216,7 +236,7 @@ public class OverlordResourceTest @Test public void testLeader() { - EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once(); + EasyMock.expect(overlord.getCurrentLeader()).andReturn("boz").once(); replayAll(); final Response response = overlordResource.getLeader(); @@ -227,8 +247,8 @@ public class OverlordResourceTest @Test public void testIsLeader() { - EasyMock.expect(taskMaster.isLeader()).andReturn(true).once(); - EasyMock.expect(taskMaster.isLeader()).andReturn(false).once(); + EasyMock.expect(overlord.isLeader()).andReturn(true).once(); + EasyMock.expect(overlord.isLeader()).andReturn(false).once(); replayAll(); // true @@ -247,7 +267,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -282,7 +302,7 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ).andStubReturn( ImmutableList.of( @@ -313,7 +333,7 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -340,7 +360,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance(), @@ -381,7 +401,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //completed tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -423,7 +443,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //active tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -466,7 +486,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -516,7 +536,7 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -550,7 +570,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null ) @@ -577,7 +597,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); Duration duration = new Period("PT86400S").toStandardDuration(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( EasyMock.anyObject(), EasyMock.anyObject() ) @@ -609,7 +629,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -658,7 +678,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -707,8 +727,10 @@ public class OverlordResourceTest replayAll(); // Verify that only the tasks of read access datasource are returned - expectedException.expect(WebApplicationException.class); - overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req); + Assert.assertThrows( + WebApplicationException.class, + () -> overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req) + ); } @Test @@ -716,7 +738,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null) @@ -749,7 +771,7 @@ public class OverlordResourceTest .getTasks("blah", "ds_test", null, null, null, req) .getEntity(); Assert.assertEquals( - "Invalid state : blah, valid values are: [pending, waiting, running, complete]", + "Invalid task state[blah]. Must be one of [pending, waiting, running, complete].", responseObject.toString() ); } @@ -824,7 +846,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); EasyMock .expect( indexerMetadataStorageAdapter.deletePendingSegments( @@ -847,7 +869,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "Some exception msg"; EasyMock .expect( @@ -873,7 +895,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "An internal defensive exception"; EasyMock .expect( @@ -899,7 +921,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "An unexpected illegal state exception"; EasyMock .expect( @@ -925,7 +947,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(false); + EasyMock.expect(overlord.isLeader()).andReturn(false); replayAll(); @@ -943,11 +965,13 @@ public class OverlordResourceTest // set authorization token properly, but isn't called in this test. // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + final NoopTask task = NoopTask.create(); - EasyMock.expect(taskQueryTool.getTask("mytask")) + EasyMock.expect(taskStorage.getTask("mytask")) .andReturn(Optional.of(task)); - EasyMock.expect(taskQueryTool.getTask("othertask")) + EasyMock.expect(taskStorage.getTask("othertask")) .andReturn(Optional.absent()); replayAll(); @@ -1042,7 +1066,7 @@ public class OverlordResourceTest ) ); - EasyMock.expect(taskQueryTool.getLockedIntervals(minTaskPriority)) + EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority)) .andReturn(expectedLockedIntervals); replayAll(); @@ -1079,13 +1103,9 @@ public class OverlordResourceTest // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(taskRunner) - ).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn( Optional.of(mockQueue) - ).anyTimes(); + ).once(); mockQueue.shutdown("id_1", "Shutdown request from user"); EasyMock.expectLastCall(); @@ -1105,14 +1125,12 @@ public class OverlordResourceTest // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(taskRunner) - ).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn( Optional.of(mockQueue) ).anyTimes(); - EasyMock.expect(taskQueryTool.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( + EasyMock.expect( + taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "datasource") + ).andStubReturn(ImmutableList.of( new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -1146,9 +1164,10 @@ public class OverlordResourceTest public void testShutdownAllTasksForNonExistingDataSource() { final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskQueryTool.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); + EasyMock.expect(taskStorage.getTaskInfos(EasyMock.anyObject(TaskLookup.class), EasyMock.anyString())) + .andReturn(Collections.emptyList()); replayAll(); final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); @@ -1222,10 +1241,7 @@ public class OverlordResourceTest @Test public void testGetTotalWorkerCapacityNotLeader() { - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.absent() - ).anyTimes(); + EasyMock.expect(overlord.isLeader()).andReturn(false); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); @@ -1235,10 +1251,13 @@ public class OverlordResourceTest public void testGetTotalWorkerCapacityWithUnknown() { WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + AtomicReference workerBehaviorConfigAtomicReference + = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)) + .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1255,6 +1274,7 @@ public class OverlordResourceTest EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1272,6 +1292,7 @@ public class OverlordResourceTest EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1317,6 +1338,7 @@ public class OverlordResourceTest workerTaskRunner, autoScaler ); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1362,6 +1384,7 @@ public class OverlordResourceTest workerTaskRunner, autoScaler ); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1452,26 +1475,12 @@ public class OverlordResourceTest @Test public void testGetMultipleTaskStatuses_presentTaskQueue() { - replayAll(); - - TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); + EasyMock.expect(taskMaster.getTaskQueue()) + .andReturn(Optional.of(taskQueue)); EasyMock.expect(taskQueue.getTaskStatus("task")) .andReturn(Optional.of(TaskStatus.running("task"))); - TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)); - EasyMock.replay(taskMaster, taskQueue); - OverlordResource overlordResource = new OverlordResource( - taskMaster, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); + replayAll(); + final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) .getEntity(); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); @@ -1480,27 +1489,11 @@ public class OverlordResourceTest @Test public void testGetMultipleTaskStatuses_absentTaskQueue() { + EasyMock.expect(taskStorage.getStatus("task")) + .andReturn(Optional.of(TaskStatus.running("task"))); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); replayAll(); - TaskQueryTool taskQueryTool = EasyMock.createMock(TaskQueryTool.class); - EasyMock.expect(taskQueryTool.getStatus("task")) - .andReturn(Optional.of(TaskStatus.running("task"))); - - TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); - EasyMock.replay(taskMaster, taskQueryTool); - OverlordResource overlordResource = new OverlordResource( - taskMaster, - taskQueryTool, - null, - null, - null, - null, - null, - null, - null, - null - ); final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) .getEntity(); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 9bbf2e9fc8a..d1c2167b929 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTaskContextEnricher; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -106,6 +107,7 @@ public class OverlordTest private TestingServer server; private Timing timing; private CuratorFramework curator; + private DruidOverlord overlord; private TaskMaster taskMaster; private TaskLockbox taskLockbox; private TaskStorage taskStorage; @@ -235,6 +237,11 @@ public class OverlordTest taskRunnerFactory.build().run(goodTask); taskMaster = new TaskMaster( + taskActionClientFactory, + supervisorManager + ); + overlord = new DruidOverlord( + taskMaster, new TaskLockConfig(), new TaskQueueConfig(null, new Period(1), null, new Period(10), null, null), new DefaultTaskConfig(), @@ -260,20 +267,23 @@ public class OverlordTest public void testOverlordRun() throws Exception { // basic task master lifecycle test - taskMaster.start(); + overlord.start(); announcementLatch.await(); - while (!taskMaster.isLeader()) { + while (!overlord.isLeader()) { // I believe the control will never reach here and thread will never sleep but just to be on safe side Thread.sleep(10); } - Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); - Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); + Assert.assertEquals(overlord.getCurrentLeader(), druidNode.getHostAndPort()); + Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation()); - final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); - final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); + final TaskQueryTool taskQueryTool + = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null); + final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter + = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); overlordResource = new OverlordResource( + overlord, taskMaster, taskQueryTool, new IndexerMetadataStorageAdapter(taskStorage, null), @@ -282,7 +292,6 @@ public class OverlordTest auditManager, AuthTestUtils.TEST_AUTHORIZER_MAPPER, workerTaskRunnerQueryAdapter, - null, new AuthConfig() ); Response response = overlordResource.getLeader(); @@ -351,8 +360,8 @@ public class OverlordTest Assert.assertEquals(1, (((List) response.getEntity()).size())); Assert.assertEquals(1, taskMaster.getStats().rowCount()); - taskMaster.stop(); - Assert.assertFalse(taskMaster.isLeader()); + overlord.stop(); + Assert.assertFalse(overlord.isLeader()); Assert.assertEquals(0, taskMaster.getStats().rowCount()); EasyMock.verify(taskActionClientFactory); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 4dd23f2faf2..883b7828822 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -67,6 +67,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; @@ -210,6 +211,7 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class); binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + binder.bind(DruidOverlord.class).in(ManageLifecycle.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class); @@ -382,11 +384,11 @@ public class CliOverlord extends ServerRunnable @Provides @LazySingleton @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) - public Supplier> getHeartbeatSupplier(TaskMaster taskMaster) + public Supplier> getHeartbeatSupplier(DruidOverlord overlord) { return () -> { Map heartbeatTags = new HashMap<>(); - heartbeatTags.put("leader", taskMaster.isLeader() ? 1 : 0); + heartbeatTags.put("leader", overlord.isLeader() ? 1 : 0); return heartbeatTags; }; diff --git a/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java b/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java index 60415ea4f32..0f77da1b4c4 100644 --- a/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java +++ b/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java @@ -20,7 +20,7 @@ package org.apache.druid.cli; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.http.CoordinatorRedirectInfo; @@ -36,9 +36,9 @@ public class CoordinatorOverlordRedirectInfo implements RedirectInfo private final CoordinatorRedirectInfo coordinatorRedirectInfo; @Inject - public CoordinatorOverlordRedirectInfo(TaskMaster taskMaster, DruidCoordinator druidCoordinator) + public CoordinatorOverlordRedirectInfo(DruidOverlord druidOverlord, DruidCoordinator druidCoordinator) { - this.overlordRedirectInfo = new OverlordRedirectInfo(taskMaster); + this.overlordRedirectInfo = new OverlordRedirectInfo(druidOverlord); this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator); } diff --git a/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java b/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java new file mode 100644 index 00000000000..d33c05838a2 --- /dev/null +++ b/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import org.apache.druid.indexing.overlord.DruidOverlord; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CoordinatorOverlordRedirectInfoTest +{ + private DruidOverlord overlord; + private DruidCoordinator coordinator; + private CoordinatorOverlordRedirectInfo redirectInfo; + + @Before + public void setUp() + { + overlord = EasyMock.createMock(DruidOverlord.class); + coordinator = EasyMock.createMock(DruidCoordinator.class); + redirectInfo = new CoordinatorOverlordRedirectInfo(overlord, coordinator); + } + + @Test + public void testDoLocalIndexerWhenLeading() + { + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); + EasyMock.replay(overlord); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path")); + EasyMock.verify(overlord); + } +}