diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java new file mode 100644 index 00000000000..f99189d3589 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.inject.Inject; +import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.curator.discovery.ServiceAnnouncer; +import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.task.TaskContextEnricher; +import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Encapsulates the leadership lifecycle of the Druid Overlord service. + * No classes other than Resource endpoints should have this class as a dependency. + * To query the current state of the Overlord, use {@link TaskMaster} instead. + */ +public class DruidOverlord +{ + private static final EmittingLogger log = new EmittingLogger(DruidOverlord.class); + + private final DruidLeaderSelector overlordLeaderSelector; + private final DruidLeaderSelector.Listener leadershipListener; + + private final ReentrantLock giant = new ReentrantLock(true); + + private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); + + /** + * Indicates that all services have been started and the node can now announce + * itself with {@link ServiceAnnouncer#announce}. This must be set to false + * as soon as {@link DruidLeaderSelector.Listener#stopBeingLeader()} is + * called. + */ + private volatile boolean initialized; + + @Inject + public DruidOverlord( + final TaskMaster taskMaster, + final TaskLockConfig taskLockConfig, + final TaskQueueConfig taskQueueConfig, + final DefaultTaskConfig defaultTaskConfig, + final TaskLockbox taskLockbox, + final TaskStorage taskStorage, + final TaskActionClientFactory taskActionClientFactory, + @Self final DruidNode selfNode, + final TaskRunnerFactory runnerFactory, + final ServiceAnnouncer serviceAnnouncer, + final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, + final ServiceEmitter emitter, + final SupervisorManager supervisorManager, + final OverlordDutyExecutor overlordDutyExecutor, + @IndexingService final DruidLeaderSelector overlordLeaderSelector, + final SegmentAllocationQueue segmentAllocationQueue, + final ObjectMapper mapper, + final TaskContextEnricher taskContextEnricher + ) + { + this.overlordLeaderSelector = overlordLeaderSelector; + + final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : + selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); + + this.leadershipListener = new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + giant.lock(); + + // I AM THE MASTER OF THE UNIVERSE. + log.info("By the power of Grayskull, I have the power!"); + + try { + final TaskRunner taskRunner = runnerFactory.build(); + final TaskQueue taskQueue = new TaskQueue( + taskLockConfig, + taskQueueConfig, + defaultTaskConfig, + taskStorage, + taskRunner, + taskActionClientFactory, + taskLockbox, + emitter, + mapper, + taskContextEnricher + ); + + // Sensible order to start stuff: + final Lifecycle leaderLifecycle = new Lifecycle("task-master"); + if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { + log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") + .emit(); + } + + leaderLifecycle.addManagedInstance(taskRunner); + leaderLifecycle.addManagedInstance(taskQueue); + leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordDutyExecutor); + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() + { + segmentAllocationQueue.becomeLeader(); + taskMaster.becomeLeader(taskRunner, taskQueue); + + // Announce the node only after all the services have been initialized + initialized = true; + serviceAnnouncer.announce(node); + } + + @Override + public void stop() + { + serviceAnnouncer.unannounce(node); + taskMaster.stopBeingLeader(); + segmentAllocationQueue.stopBeingLeader(); + } + } + ); + + leaderLifecycle.start(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + giant.unlock(); + } + } + + @Override + public void stopBeingLeader() + { + giant.lock(); + try { + initialized = false; + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + + if (leaderLifecycle != null) { + leaderLifecycle.stop(); + } + } + finally { + giant.unlock(); + } + } + }; + } + + /** + * Starts waiting for leadership. + * Should be called only once throughout the life of the service. + */ + @LifecycleStart + public void start() + { + giant.lock(); + + try { + overlordLeaderSelector.registerListener(leadershipListener); + } + finally { + giant.unlock(); + } + } + + /** + * Stops forever (not just this particular leadership session). + * Should be called only once throughout the life of the service. + */ + @LifecycleStop + public void stop() + { + giant.lock(); + + try { + gracefulStopLeaderLifecycle(); + overlordLeaderSelector.unregisterListener(); + } + finally { + giant.unlock(); + } + } + + /** + * @return true if it's the leader and all its services have been initialized. + */ + public boolean isLeader() + { + return overlordLeaderSelector.isLeader() && initialized; + } + + public String getCurrentLeader() + { + return overlordLeaderSelector.getCurrentLeader(); + } + + public Optional getRedirectLocation() + { + String leader = overlordLeaderSelector.getCurrentLeader(); + // do not redirect when + // leader is not elected + // leader is the current node + if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) { + return Optional.absent(); + } else { + return Optional.of(leader); + } + } + + private void gracefulStopLeaderLifecycle() + { + try { + if (isLeader()) { + leadershipListener.stopBeingLeader(); + } + } + catch (Exception ex) { + // fail silently since we are stopping anyway + } + } + +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 5103f9bd87e..1de95352504 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -19,254 +19,61 @@ package org.apache.druid.indexing.overlord; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.inject.Inject; -import org.apache.druid.client.indexing.IndexingService; -import org.apache.druid.curator.discovery.ServiceAnnouncer; -import org.apache.druid.discovery.DruidLeaderSelector; -import org.apache.druid.discovery.DruidLeaderSelector.Listener; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskContextEnricher; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; -import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; -import org.apache.druid.indexing.overlord.config.TaskLockConfig; -import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.java.util.common.lifecycle.Lifecycle; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.metrics.TaskSlotCountStatsProvider; import javax.annotation.Nullable; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; /** - * Encapsulates the indexer leadership lifecycle. + * Encapsulates various Overlord classes that allow querying and updating the + * current state of the Overlord leader. */ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider { - private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); - - private final DruidLeaderSelector overlordLeaderSelector; - private final DruidLeaderSelector.Listener leadershipListener; - - private final ReentrantLock giant = new ReentrantLock(true); private final TaskActionClientFactory taskActionClientFactory; private final SupervisorManager supervisorManager; - - private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); - private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; - /** - * This flag indicates that all services has been started and should be true before calling - * {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is - * called. - */ - private volatile boolean initialized; + private final AtomicBoolean isLeader = new AtomicBoolean(false); @Inject public TaskMaster( - final TaskLockConfig taskLockConfig, - final TaskQueueConfig taskQueueConfig, - final DefaultTaskConfig defaultTaskConfig, - final TaskLockbox taskLockbox, - final TaskStorage taskStorage, - final TaskActionClientFactory taskActionClientFactory, - @Self final DruidNode selfNode, - final TaskRunnerFactory runnerFactory, - final ServiceAnnouncer serviceAnnouncer, - final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, - final ServiceEmitter emitter, - final SupervisorManager supervisorManager, - final OverlordDutyExecutor overlordDutyExecutor, - @IndexingService final DruidLeaderSelector overlordLeaderSelector, - final SegmentAllocationQueue segmentAllocationQueue, - final ObjectMapper mapper, - final TaskContextEnricher taskContextEnricher + TaskActionClientFactory taskActionClientFactory, + SupervisorManager supervisorManager ) { - this.supervisorManager = supervisorManager; this.taskActionClientFactory = taskActionClientFactory; - - this.overlordLeaderSelector = overlordLeaderSelector; - - final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : - selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); - - this.leadershipListener = new DruidLeaderSelector.Listener() - { - @Override - public void becomeLeader() - { - giant.lock(); - - // I AM THE MASTER OF THE UNIVERSE. - log.info("By the power of Grayskull, I have the power!"); - - try { - taskRunner = runnerFactory.build(); - taskQueue = new TaskQueue( - taskLockConfig, - taskQueueConfig, - defaultTaskConfig, - taskStorage, - taskRunner, - taskActionClientFactory, - taskLockbox, - emitter, - mapper, - taskContextEnricher - ); - - // Sensible order to start stuff: - final Lifecycle leaderLifecycle = new Lifecycle("task-master"); - if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { - log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") - .emit(); - } - - leaderLifecycle.addManagedInstance(taskRunner); - leaderLifecycle.addManagedInstance(taskQueue); - leaderLifecycle.addManagedInstance(supervisorManager); - leaderLifecycle.addManagedInstance(overlordDutyExecutor); - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() - { - segmentAllocationQueue.becomeLeader(); - } - - @Override - public void stop() - { - segmentAllocationQueue.stopBeingLeader(); - } - } - ); - - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() - { - initialized = true; - serviceAnnouncer.announce(node); - } - - @Override - public void stop() - { - serviceAnnouncer.unannounce(node); - } - } - ); - - leaderLifecycle.start(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - giant.unlock(); - } - } - - @Override - public void stopBeingLeader() - { - giant.lock(); - try { - initialized = false; - final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); - - if (leaderLifecycle != null) { - leaderLifecycle.stop(); - } - } - finally { - giant.unlock(); - } - } - }; + this.supervisorManager = supervisorManager; } - /** - * Starts waiting for leadership. Should only be called once throughout the life of the program. - */ - @LifecycleStart - public void start() + public void becomeLeader(TaskRunner taskRunner, TaskQueue taskQueue) { - giant.lock(); - - try { - overlordLeaderSelector.registerListener(leadershipListener); - } - finally { - giant.unlock(); - } + this.taskRunner = taskRunner; + this.taskQueue = taskQueue; + isLeader.set(true); } - /** - * Stops forever (not just this particular leadership session). Should only be called once throughout the life of - * the program. - */ - @LifecycleStop - public void stop() + public void stopBeingLeader() { - giant.lock(); - - try { - gracefulStopLeaderLifecycle(); - overlordLeaderSelector.unregisterListener(); - } - finally { - giant.unlock(); - } + isLeader.set(false); + this.taskQueue = null; + this.taskRunner = null; } - /** - * Returns true if it's the leader and all its services have been initialized. - */ - public boolean isLeader() + private boolean isLeader() { - return overlordLeaderSelector.isLeader() && initialized; - } - - public String getCurrentLeader() - { - return overlordLeaderSelector.getCurrentLeader(); - } - - public Optional getRedirectLocation() - { - String leader = overlordLeaderSelector.getCurrentLeader(); - // do not redirect when - // leader is not elected - // leader is the current node - if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) { - return Optional.absent(); - } else { - return Optional.of(leader); - } + return isLeader.get(); } public Optional getTaskRunner() @@ -380,18 +187,6 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } } - private void gracefulStopLeaderLifecycle() - { - try { - if (isLeader()) { - leadershipListener.stopBeingLeader(); - } - } - catch (Exception ex) { - // fail silently since we are stopping anyway - } - } - @Override @Nullable public Map getTotalTaskSlotCount() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 29ca16f5aa9..f5351d7c6e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -20,19 +20,37 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import org.apache.druid.indexing.overlord.http.TaskStateLookup; +import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Provides read-only methods to fetch information related to tasks. @@ -42,16 +60,28 @@ import java.util.Map; */ public class TaskQueryTool { + private static final Logger log = new Logger(TaskQueryTool.class); + private final TaskStorage storage; private final TaskLockbox taskLockbox; private final TaskMaster taskMaster; + private final JacksonConfigManager configManager; + private final ProvisioningStrategy provisioningStrategy; @Inject - public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) + public TaskQueryTool( + TaskStorage storage, + TaskLockbox taskLockbox, + TaskMaster taskMaster, + ProvisioningStrategy provisioningStrategy, + JacksonConfigManager configManager + ) { this.storage = storage; this.taskLockbox = taskLockbox; this.taskMaster = taskMaster; + this.configManager = configManager; + this.provisioningStrategy = provisioningStrategy; } /** @@ -81,13 +111,10 @@ public class TaskQueryTool public List> getActiveTaskInfo(@Nullable String dataSource) { - return storage.getTaskInfos( - TaskLookup.activeTasksOnly(), - dataSource - ); + return storage.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource); } - public List getTaskStatusPlusList( + private List getTaskStatusPlusList( Map taskLookups, @Nullable String dataSource ) @@ -107,9 +134,14 @@ public class TaskQueryTool return storage.getTask(taskId); } - public Optional getStatus(final String taskId) + public Optional getTaskStatus(final String taskId) { - return storage.getStatus(taskId); + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getTaskStatus(taskId); + } else { + return storage.getStatus(taskId); + } } @Nullable @@ -118,4 +150,235 @@ public class TaskQueryTool return storage.getTaskInfo(taskId); } + public List getTaskStatusPlusList( + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String createdTimeInterval, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + Optional taskRunnerOptional = taskMaster.getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + return Collections.emptyList(); + } + final TaskRunner taskRunner = taskRunnerOptional.get(); + + final Duration createdTimeDuration; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); + } else { + createdTimeDuration = null; + } + + // Ideally, snapshotting in taskStorage and taskRunner should be done atomically, + // but there is no way to do it today. + // Instead, we first gets a snapshot from taskStorage and then one from taskRunner. + // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process + // and use the snapshot from taskRunner as a reference for potential task state updates happened + // after the first snapshotting. + Stream taskStatusPlusStream = getTaskStatusPlusList( + state, + dataSource, + createdTimeDuration, + maxCompletedTasks, + type + ); + final Map runnerWorkItems = getTaskRunnerWorkItems( + taskRunner, + state, + dataSource, + type + ); + + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { + // We are interested in only those tasks which are in taskRunner. + taskStatusPlusStream = taskStatusPlusStream + .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); + } + final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); + + // Separate complete and active tasks from taskStorage. + // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. + final List completeTaskStatusPlusList = new ArrayList<>(); + final List activeTaskStatusPlusList = new ArrayList<>(); + for (TaskStatusPlus statusPlus : taskStatusPlusList) { + if (statusPlus.getStatusCode().isComplete()) { + completeTaskStatusPlusList.add(statusPlus); + } else { + activeTaskStatusPlusList.add(statusPlus); + } + } + + final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); + + activeTaskStatusPlusList.forEach(statusPlus -> { + final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); + if (runnerWorkItem == null) { + // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. + if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { + taskStatuses.add(statusPlus); + } + } else { + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { + taskStatuses.add( + new TaskStatusPlus( + statusPlus.getId(), + statusPlus.getGroupId(), + statusPlus.getType(), + statusPlus.getCreatedTime(), + runnerWorkItem.getQueueInsertionTime(), + statusPlus.getStatusCode(), + taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner + statusPlus.getDuration(), + runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. + statusPlus.getDataSource(), + statusPlus.getErrorMsg() + ) + ); + } + } + }); + + return taskStatuses; + } + + private Stream getTaskStatusPlusList( + TaskStateLookup state, + @Nullable String dataSource, + Duration createdTimeDuration, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + final Map taskLookups; + switch (state) { + case ALL: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + TaskLookup.ActiveTaskLookup.getInstance(), + TaskLookupType.COMPLETE, + TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case COMPLETE: + taskLookups = ImmutableMap.of( + TaskLookupType.COMPLETE, + TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case WAITING: + case PENDING: + case RUNNING: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + TaskLookup.ActiveTaskLookup.getInstance() + ); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + + final Stream taskStatusPlusStream = getTaskStatusPlusList( + taskLookups, + dataSource + ).stream(); + if (type != null) { + return taskStatusPlusStream.filter( + statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) + ); + } else { + return taskStatusPlusStream; + } + } + + private Map getTaskRunnerWorkItems( + TaskRunner taskRunner, + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String type + ) + { + Stream runnerWorkItemsStream; + switch (state) { + case ALL: + case WAITING: + // waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner) + runnerWorkItemsStream = taskRunner.getKnownTasks().stream(); + break; + case PENDING: + runnerWorkItemsStream = taskRunner.getPendingTasks().stream(); + break; + case RUNNING: + runnerWorkItemsStream = taskRunner.getRunningTasks().stream(); + break; + case COMPLETE: + runnerWorkItemsStream = Stream.empty(); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + if (dataSource != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource())); + } + if (type != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType())); + } + return runnerWorkItemsStream + .collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item)); + } + + public TotalWorkerCapacityResponse getTotalWorkerCapacity() + { + Optional taskRunnerOptional = taskMaster.getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + return null; + } + TaskRunner taskRunner = taskRunnerOptional.get(); + + Collection workers = taskRunner instanceof WorkerTaskRunner ? + ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); + + int currentCapacity = taskRunner.getTotalCapacity(); + int usedCapacity = taskRunner.getUsedCapacity(); + // Calculate maximum capacity with auto scale + int maximumCapacity; + WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } else { + // Auto-scale is not using DefaultWorkerBehaviorConfig + log.debug( + "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", + workerBehaviorConfig, + workerBehaviorConfig.getClass().getSimpleName() + ); + maximumCapacity = -1; + } + + return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity); + } + + public WorkerBehaviorConfig getLatestWorkerConfig() + { + return configManager.watch( + WorkerBehaviorConfig.CONFIG_KEY, + WorkerBehaviorConfig.class + ).get(); + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java index 4e332f599df..41e1ef8b7c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfo.java @@ -22,7 +22,7 @@ package org.apache.druid.indexing.overlord.http; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.RedirectInfo; @@ -38,25 +38,25 @@ public class OverlordRedirectInfo implements RedirectInfo "/druid/indexer/v1/isLeader" ); - private final TaskMaster taskMaster; + private final DruidOverlord overlord; @Inject - public OverlordRedirectInfo(TaskMaster taskMaster) + public OverlordRedirectInfo(DruidOverlord overlord) { - this.taskMaster = taskMaster; + this.overlord = overlord; } @Override public boolean doLocal(String requestURI) { - return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || taskMaster.isLeader(); + return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || overlord.isLeader(); } @Override public URL getRedirectURL(String queryString, String requestURI) { try { - final Optional redirectLocation = taskMaster.getRedirectLocation(); + final Optional redirectLocation = overlord.getRedirectLocation(); if (!redirectLocation.isPresent()) { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f945cc95c1e..54ada7cb2b4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -33,6 +33,7 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; @@ -44,31 +45,23 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; -import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; -import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLookup; -import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; -import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; -import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -84,7 +77,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.joda.time.Duration; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -104,17 +97,12 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * @@ -125,6 +113,7 @@ public class OverlordResource private static final Logger log = new Logger(OverlordResource.class); private final TaskMaster taskMaster; + private final DruidOverlord overlord; private final TaskQueryTool taskQueryTool; private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private final TaskLogStreamer taskLogStreamer; @@ -132,35 +121,16 @@ public class OverlordResource private final AuditManager auditManager; private final AuthorizerMapper authorizerMapper; private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; - private final ProvisioningStrategy provisioningStrategy; private final AuthConfig authConfig; - private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); private static final Set AUDITED_TASK_TYPES = ImmutableSet.of("index", "index_parallel", "compact", "index_hadoop", "kill"); - private enum TaskStateLookup - { - ALL, - WAITING, - PENDING, - RUNNING, - COMPLETE; - - private static TaskStateLookup fromString(@Nullable String state) - { - if (state == null) { - return ALL; - } else { - return TaskStateLookup.valueOf(StringUtils.toUpperCase(state)); - } - } - } - @Inject public OverlordResource( + DruidOverlord overlord, TaskMaster taskMaster, TaskQueryTool taskQueryTool, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, @@ -169,10 +139,10 @@ public class OverlordResource AuditManager auditManager, AuthorizerMapper authorizerMapper, WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, - ProvisioningStrategy provisioningStrategy, AuthConfig authConfig ) { + this.overlord = overlord; this.taskMaster = taskMaster; this.taskQueryTool = taskQueryTool; this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter; @@ -181,7 +151,6 @@ public class OverlordResource this.auditManager = auditManager; this.authorizerMapper = authorizerMapper; this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter; - this.provisioningStrategy = provisioningStrategy; this.authConfig = authConfig; } @@ -252,7 +221,7 @@ public class OverlordResource @Produces(MediaType.APPLICATION_JSON) public Response getLeader() { - return Response.ok(taskMaster.getCurrentLeader()).build(); + return Response.ok(overlord.getCurrentLeader()).build(); } /** @@ -263,7 +232,7 @@ public class OverlordResource @Produces(MediaType.APPLICATION_JSON) public Response isLeader() { - final boolean leading = taskMaster.isLeader(); + final boolean leading = overlord.isLeader(); final Map response = ImmutableMap.of("leader", leading); if (leading) { return Response.ok(response).build(); @@ -373,9 +342,7 @@ public class OverlordResource taskInfo.getStatus().getStatusCode(), RunnerTaskState.WAITING, taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation() == null - ? TaskLocation.unknown() - : taskInfo.getStatus().getLocation(), + Configs.valueOrDefault(taskInfo.getStatus().getLocation(), TaskLocation.unknown()), taskInfo.getDataSource(), taskInfo.getStatus().getErrorMsg() ) @@ -415,14 +382,9 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - taskQueue.shutdown(taskid, "Shutdown request from user"); - return Response.ok(ImmutableMap.of("task", taskid)).build(); - } + taskQueue -> { + taskQueue.shutdown(taskid, "Shutdown request from user"); + return Response.ok(ImmutableMap.of("task", taskid)).build(); } ); } @@ -435,20 +397,15 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - final List> tasks = taskQueryTool.getActiveTaskInfo(dataSource); - if (tasks.isEmpty()) { - return Response.status(Status.NOT_FOUND).build(); - } else { - for (final TaskInfo task : tasks) { - taskQueue.shutdown(task.getId(), "Shutdown request from user"); - } - return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); + taskQueue -> { + final List> tasks = taskQueryTool.getActiveTaskInfo(dataSource); + if (tasks.isEmpty()) { + return Response.status(Status.NOT_FOUND).build(); + } else { + for (final TaskInfo task : tasks) { + taskQueue.shutdown(task.getId(), "Shutdown request from user"); } + return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); } } ); @@ -460,19 +417,13 @@ public class OverlordResource @ResourceFilters(StateResourceFilter.class) public Response getMultipleTaskStatuses(Set taskIds) { - if (taskIds == null || taskIds.size() == 0) { - return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build(); + if (CollectionUtils.isNullOrEmpty(taskIds)) { + return Response.status(Response.Status.BAD_REQUEST).entity("No Task IDs provided.").build(); } - final Optional taskQueue = taskMaster.getTaskQueue(); - Map result = Maps.newHashMapWithExpectedSize(taskIds.size()); + final Map result = Maps.newHashMapWithExpectedSize(taskIds.size()); for (String taskId : taskIds) { - final Optional optional; - if (taskQueue.isPresent()) { - optional = taskQueue.get().getTaskStatus(taskId); - } else { - optional = taskQueryTool.getStatus(taskId); - } + final Optional optional = taskQueryTool.getTaskStatus(taskId); if (optional.isPresent()) { result.put(taskId, optional.get()); } @@ -487,11 +438,7 @@ public class OverlordResource @ResourceFilters(ConfigResourceFilter.class) public Response getWorkerConfig() { - if (workerConfigRef == null) { - workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class); - } - - return Response.ok(workerConfigRef.get()).build(); + return Response.ok(taskQueryTool.getLatestWorkerConfig()).build(); } /** @@ -503,49 +450,11 @@ public class OverlordResource @ResourceFilters(ConfigResourceFilter.class) public Response getTotalWorkerCapacity() { - // Calculate current cluster capacity - Optional taskRunnerOptional = taskMaster.getTaskRunner(); - if (!taskRunnerOptional.isPresent()) { - // Cannot serve call as not leader + if (overlord.isLeader()) { + return Response.ok(taskQueryTool.getTotalWorkerCapacity()).build(); + } else { return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } - TaskRunner taskRunner = taskRunnerOptional.get(); - Collection workers = taskRunner instanceof WorkerTaskRunner ? - ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); - - int currentCapacity = taskRunner.getTotalCapacity(); - int usedCapacity = taskRunner.getUsedCapacity(); - // Calculate maximum capacity with auto scale - int maximumCapacity; - if (workerConfigRef == null) { - workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class); - } - WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); - if (workerBehaviorConfig == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); - maximumCapacity = -1; - } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { - DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; - if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); - maximumCapacity = -1; - } else { - int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); - int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); - maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; - } - } else { - // Auto scale is not using DefaultWorkerBehaviorConfig - log.debug( - "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", - workerBehaviorConfig, - workerBehaviorConfig.getClass().getSimpleName() - ); - maximumCapacity = -1; - } - return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build(); } // default value is used for backwards compatibility @@ -693,9 +602,13 @@ public class OverlordResource //check for valid state if (state != null) { if (!API_TASK_STATES.contains(StringUtils.toLowerCase(state))) { + String errorMessage = StringUtils.format( + "Invalid task state[%s]. Must be one of %s.", + state, API_TASK_STATES + ); return Response.status(Status.BAD_REQUEST) .type(MediaType.TEXT_PLAIN) - .entity(StringUtils.format("Invalid state : %s, valid values are: %s", state, API_TASK_STATES)) + .entity(errorMessage) .build(); } } @@ -725,8 +638,7 @@ public class OverlordResource taskMaster.getTaskRunner(), taskRunner -> { final List authorizedList = securedTaskStatusPlus( - getTaskStatusPlusList( - taskRunner, + taskQueryTool.getTaskStatusPlusList( TaskStateLookup.fromString(state), dataSource, createdTimeInterval, @@ -741,180 +653,6 @@ public class OverlordResource ); } - private List getTaskStatusPlusList( - TaskRunner taskRunner, - TaskStateLookup state, - @Nullable String dataSource, - @Nullable String createdTimeInterval, - @Nullable Integer maxCompletedTasks, - @Nullable String type - ) - { - final Duration createdTimeDuration; - if (createdTimeInterval != null) { - final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); - createdTimeDuration = theInterval.toDuration(); - } else { - createdTimeDuration = null; - } - - // Ideally, snapshotting in taskStorage and taskRunner should be done atomically, - // but there is no way to do it today. - // Instead, we first gets a snapshot from taskStorage and then one from taskRunner. - // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process - // and use the snapshot from taskRunner as a reference for potential task state updates happened - // after the first snapshotting. - Stream taskStatusPlusStream = getTaskStatusPlusList( - state, - dataSource, - createdTimeDuration, - maxCompletedTasks, - type - ); - final Map runnerWorkItems = getTaskRunnerWorkItems( - taskRunner, - state, - dataSource, - type - ); - - if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { - // We are interested in only those tasks which are in taskRunner. - taskStatusPlusStream = taskStatusPlusStream - .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); - } - final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); - - // Separate complete and active tasks from taskStorage. - // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. - final List completeTaskStatusPlusList = new ArrayList<>(); - final List activeTaskStatusPlusList = new ArrayList<>(); - for (TaskStatusPlus statusPlus : taskStatusPlusList) { - if (statusPlus.getStatusCode().isComplete()) { - completeTaskStatusPlusList.add(statusPlus); - } else { - activeTaskStatusPlusList.add(statusPlus); - } - } - - final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); - - activeTaskStatusPlusList.forEach(statusPlus -> { - final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); - if (runnerWorkItem == null) { - // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. - if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { - taskStatuses.add(statusPlus); - } - } else { - if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { - taskStatuses.add( - new TaskStatusPlus( - statusPlus.getId(), - statusPlus.getGroupId(), - statusPlus.getType(), - statusPlus.getCreatedTime(), - runnerWorkItem.getQueueInsertionTime(), - statusPlus.getStatusCode(), - taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner - statusPlus.getDuration(), - runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. - statusPlus.getDataSource(), - statusPlus.getErrorMsg() - ) - ); - } - } - }); - - return taskStatuses; - } - - private Stream getTaskStatusPlusList( - TaskStateLookup state, - @Nullable String dataSource, - Duration createdTimeDuration, - @Nullable Integer maxCompletedTasks, - @Nullable String type - ) - { - final Map taskLookups; - switch (state) { - case ALL: - taskLookups = ImmutableMap.of( - TaskLookupType.ACTIVE, - ActiveTaskLookup.getInstance(), - TaskLookupType.COMPLETE, - CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) - ); - break; - case COMPLETE: - taskLookups = ImmutableMap.of( - TaskLookupType.COMPLETE, - CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) - ); - break; - case WAITING: - case PENDING: - case RUNNING: - taskLookups = ImmutableMap.of( - TaskLookupType.ACTIVE, - ActiveTaskLookup.getInstance() - ); - break; - default: - throw new IAE("Unknown state: [%s]", state); - } - - final Stream taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList( - taskLookups, - dataSource - ).stream(); - if (type != null) { - return taskStatusPlusStream.filter( - statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) - ); - } else { - return taskStatusPlusStream; - } - } - - private Map getTaskRunnerWorkItems( - TaskRunner taskRunner, - TaskStateLookup state, - @Nullable String dataSource, - @Nullable String type - ) - { - Stream runnerWorkItemsStream; - switch (state) { - case ALL: - case WAITING: - // waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner) - runnerWorkItemsStream = taskRunner.getKnownTasks().stream(); - break; - case PENDING: - runnerWorkItemsStream = taskRunner.getPendingTasks().stream(); - break; - case RUNNING: - runnerWorkItemsStream = taskRunner.getRunningTasks().stream(); - break; - case COMPLETE: - runnerWorkItemsStream = Stream.empty(); - break; - default: - throw new IAE("Unknown state: [%s]", state); - } - if (dataSource != null) { - runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource())); - } - if (type != null) { - runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType())); - } - return runnerWorkItemsStream - .collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item)); - } - @DELETE @Path("/pendingSegments/{dataSource}") @Produces(MediaType.APPLICATION_JSON) @@ -939,7 +677,7 @@ public class OverlordResource throw new ForbiddenException(authResult.getMessage()); } - if (taskMaster.isLeader()) { + if (overlord.isLeader()) { try { final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval); return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build(); @@ -970,23 +708,17 @@ public class OverlordResource { return asLeaderWith( taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - if (taskRunner instanceof WorkerTaskRunner) { - return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); - } else { - log.debug( - "Task runner [%s] of type [%s] does not support listing workers", - taskRunner, - taskRunner.getClass().getName() - ); - return Response.serverError() - .entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) - .build(); - } + taskRunner -> { + if (taskRunner instanceof WorkerTaskRunner) { + return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); + } else { + log.debug( + "Task runner[%s] of type[%s] does not support listing workers", + taskRunner, taskRunner.getClass().getName() + ); + return Response.serverError() + .entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) + .build(); } } ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java new file mode 100644 index 00000000000..0236afdd057 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskStateLookup.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http; + +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; + +public enum TaskStateLookup +{ + ALL, + WAITING, + PENDING, + RUNNING, + COMPLETE; + + public static TaskStateLookup fromString(@Nullable String state) + { + if (state == null) { + return ALL; + } else { + return TaskStateLookup.valueOf(StringUtils.toUpperCase(state)); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java index 7924dcb6e32..3824cc20f85 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/OverlordBlinkLeadershipTest.java @@ -65,7 +65,7 @@ public class OverlordBlinkLeadershipTest /** * Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link - * TaskMaster#stop()}), then creating a new taskRunner from {@link + * TaskMaster#stopBeingLeader()}), then creating a new taskRunner from {@link * org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in * {@link TaskMaster} and start it again. */ diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 7b1209e7929..15eb216d0bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -420,17 +420,11 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest // For creating a customized TaskQueue see testRealtimeIndexTaskFailure test taskStorage = setUpTaskStorage(); - handoffNotifierFactory = setUpSegmentHandOffNotifierFactory(); - dataSegmentPusher = setUpDataSegmentPusher(); - mdc = setUpMetadataStorageCoordinator(); - tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); - taskRunner = setUpThreadPoolTaskRunner(tb); - taskQueue = setUpTaskQueue(taskStorage, taskRunner); } @@ -477,7 +471,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); - tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); + tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null); return taskStorage; } @@ -738,7 +732,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest null ); - final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); + final Optional preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); final TaskStatus mergedStatus = runTask(indexTask); @@ -1235,7 +1229,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest taskQueue.start(); taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId())); - while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { + while (tsqa.getTaskStatus(indexTask.getId()).get().isRunnable()) { if (System.currentTimeMillis() > startTime + 10 * 1000) { throw new ISE("Where did the task go?!: %s", indexTask.getId()); } @@ -1319,7 +1313,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest null ); - final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); + final Optional preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); final TaskStatus mergedStatus = runTask(indexTask); @@ -1409,7 +1403,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest TaskStatus retVal = null; try { TaskStatus status; - while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { + while ((status = tsqa.getTaskStatus(taskId).get()).isRunnable()) { if (taskRunDuration.millisElapsed() > 10_000) { throw new ISE("Where did the task go?!: %s", task.getId()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java index 46e03206f49..f9a0105c067 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordRedirectInfoTest.java @@ -20,7 +20,7 @@ package org.apache.druid.indexing.overlord.http; import com.google.common.base.Optional; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -32,48 +32,48 @@ import java.net.URLEncoder; public class OverlordRedirectInfoTest { - private TaskMaster taskMaster; + private DruidOverlord overlord; private OverlordRedirectInfo redirectInfo; @Before public void setUp() { - taskMaster = EasyMock.createMock(TaskMaster.class); - redirectInfo = new OverlordRedirectInfo(taskMaster); + overlord = EasyMock.createMock(DruidOverlord.class); + redirectInfo = new OverlordRedirectInfo(overlord); } @Test public void testDoLocalWhenLeading() { - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); + EasyMock.replay(overlord); Assert.assertTrue(redirectInfo.doLocal(null)); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path")); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test public void testDoLocalWhenNotLeading() { - EasyMock.expect(taskMaster.isLeader()).andReturn(false).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.isLeader()).andReturn(false).anyTimes(); + EasyMock.replay(overlord); Assert.assertFalse(redirectInfo.doLocal(null)); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertFalse(redirectInfo.doLocal("/druid/indexer/v1/other/path")); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test public void testGetRedirectURLWithEmptyLocation() { - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.absent()).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL("query", "/request"); Assert.assertNull(url); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test @@ -82,11 +82,11 @@ public class OverlordRedirectInfoTest String host = "http://localhost"; String query = "foo=bar&x=y"; String request = "/request"; - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL(query, request); Assert.assertEquals("http://localhost/request?foo=bar&x=y", url.toString()); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } @Test @@ -98,14 +98,14 @@ public class OverlordRedirectInfoTest "UTF-8" ) + "/status"; - EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); - EasyMock.replay(taskMaster); + EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); + EasyMock.replay(overlord); URL url = redirectInfo.getRedirectURL(null, request); Assert.assertEquals( "http://localhost/druid/indexer/v1/task/index_hadoop_datasource_2017-07-12T07%3A43%3A01.495Z/status", url.toString() ); - EasyMock.verify(taskMaster); + EasyMock.verify(overlord); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 4f2c3a38794..687d1deb7b1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -39,13 +39,16 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; @@ -58,6 +61,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -102,7 +106,10 @@ import java.util.concurrent.atomic.AtomicReference; public class OverlordResourceTest { private OverlordResource overlordResource; + private DruidOverlord overlord; private TaskMaster taskMaster; + private TaskStorage taskStorage; + private TaskLockbox taskLockbox; private JacksonConfigManager configManager; private ProvisioningStrategy provisioningStrategy; private AuthConfig authConfig; @@ -121,12 +128,21 @@ public class OverlordResourceTest public void setUp() { taskRunner = EasyMock.createMock(TaskRunner.class); - taskQueue = EasyMock.createMock(TaskQueue.class); + taskQueue = EasyMock.createStrictMock(TaskQueue.class); configManager = EasyMock.createMock(JacksonConfigManager.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); + overlord = EasyMock.createStrictMock(DruidOverlord.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); - taskQueryTool = EasyMock.createStrictMock(TaskQueryTool.class); + taskStorage = EasyMock.createStrictMock(TaskStorage.class); + taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); + taskQueryTool = new TaskQueryTool( + taskStorage, + taskLockbox, + taskMaster, + provisioningStrategy, + configManager + ); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); req = EasyMock.createStrictMock(HttpServletRequest.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); @@ -170,6 +186,7 @@ public class OverlordResourceTest }; overlordResource = new OverlordResource( + overlord, taskMaster, taskQueryTool, indexerMetadataStorageAdapter, @@ -178,7 +195,6 @@ public class OverlordResourceTest auditManager, authMapper, workerTaskRunnerQueryAdapter, - provisioningStrategy, authConfig ); } @@ -189,7 +205,8 @@ public class OverlordResourceTest EasyMock.verify( taskRunner, taskMaster, - taskQueryTool, + taskStorage, + taskLockbox, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -200,9 +217,12 @@ public class OverlordResourceTest private void replayAll() { EasyMock.replay( + overlord, taskRunner, + taskQueue, taskMaster, - taskQueryTool, + taskStorage, + taskLockbox, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -216,7 +236,7 @@ public class OverlordResourceTest @Test public void testLeader() { - EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once(); + EasyMock.expect(overlord.getCurrentLeader()).andReturn("boz").once(); replayAll(); final Response response = overlordResource.getLeader(); @@ -227,8 +247,8 @@ public class OverlordResourceTest @Test public void testIsLeader() { - EasyMock.expect(taskMaster.isLeader()).andReturn(true).once(); - EasyMock.expect(taskMaster.isLeader()).andReturn(false).once(); + EasyMock.expect(overlord.isLeader()).andReturn(true).once(); + EasyMock.expect(overlord.isLeader()).andReturn(false).once(); replayAll(); // true @@ -247,7 +267,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -282,7 +302,7 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ).andStubReturn( ImmutableList.of( @@ -313,7 +333,7 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -340,7 +360,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance(), @@ -381,7 +401,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //completed tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -423,7 +443,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //active tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -466,7 +486,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -516,7 +536,7 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -550,7 +570,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null ) @@ -577,7 +597,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); Duration duration = new Period("PT86400S").toStandardDuration(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( EasyMock.anyObject(), EasyMock.anyObject() ) @@ -609,7 +629,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -658,7 +678,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -707,8 +727,10 @@ public class OverlordResourceTest replayAll(); // Verify that only the tasks of read access datasource are returned - expectedException.expect(WebApplicationException.class); - overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req); + Assert.assertThrows( + WebApplicationException.class, + () -> overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req) + ); } @Test @@ -716,7 +738,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskQueryTool.getTaskStatusPlusList( + taskStorage.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null) @@ -749,7 +771,7 @@ public class OverlordResourceTest .getTasks("blah", "ds_test", null, null, null, req) .getEntity(); Assert.assertEquals( - "Invalid state : blah, valid values are: [pending, waiting, running, complete]", + "Invalid task state[blah]. Must be one of [pending, waiting, running, complete].", responseObject.toString() ); } @@ -824,7 +846,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); EasyMock .expect( indexerMetadataStorageAdapter.deletePendingSegments( @@ -847,7 +869,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "Some exception msg"; EasyMock .expect( @@ -873,7 +895,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "An internal defensive exception"; EasyMock .expect( @@ -899,7 +921,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(true); + EasyMock.expect(overlord.isLeader()).andReturn(true); final String exceptionMsg = "An unexpected illegal state exception"; EasyMock .expect( @@ -925,7 +947,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); - EasyMock.expect(taskMaster.isLeader()).andReturn(false); + EasyMock.expect(overlord.isLeader()).andReturn(false); replayAll(); @@ -943,11 +965,13 @@ public class OverlordResourceTest // set authorization token properly, but isn't called in this test. // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + final NoopTask task = NoopTask.create(); - EasyMock.expect(taskQueryTool.getTask("mytask")) + EasyMock.expect(taskStorage.getTask("mytask")) .andReturn(Optional.of(task)); - EasyMock.expect(taskQueryTool.getTask("othertask")) + EasyMock.expect(taskStorage.getTask("othertask")) .andReturn(Optional.absent()); replayAll(); @@ -1042,7 +1066,7 @@ public class OverlordResourceTest ) ); - EasyMock.expect(taskQueryTool.getLockedIntervals(minTaskPriority)) + EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority)) .andReturn(expectedLockedIntervals); replayAll(); @@ -1079,13 +1103,9 @@ public class OverlordResourceTest // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(taskRunner) - ).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn( Optional.of(mockQueue) - ).anyTimes(); + ).once(); mockQueue.shutdown("id_1", "Shutdown request from user"); EasyMock.expectLastCall(); @@ -1105,14 +1125,12 @@ public class OverlordResourceTest // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(taskRunner) - ).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn( Optional.of(mockQueue) ).anyTimes(); - EasyMock.expect(taskQueryTool.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( + EasyMock.expect( + taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "datasource") + ).andStubReturn(ImmutableList.of( new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -1146,9 +1164,10 @@ public class OverlordResourceTest public void testShutdownAllTasksForNonExistingDataSource() { final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); - EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskQueryTool.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); + EasyMock.expect(taskStorage.getTaskInfos(EasyMock.anyObject(TaskLookup.class), EasyMock.anyString())) + .andReturn(Collections.emptyList()); replayAll(); final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); @@ -1222,10 +1241,7 @@ public class OverlordResourceTest @Test public void testGetTotalWorkerCapacityNotLeader() { - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.absent() - ).anyTimes(); + EasyMock.expect(overlord.isLeader()).andReturn(false); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); @@ -1235,10 +1251,13 @@ public class OverlordResourceTest public void testGetTotalWorkerCapacityWithUnknown() { WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + AtomicReference workerBehaviorConfigAtomicReference + = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)) + .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1255,6 +1274,7 @@ public class OverlordResourceTest EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1272,6 +1292,7 @@ public class OverlordResourceTest EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); @@ -1317,6 +1338,7 @@ public class OverlordResourceTest workerTaskRunner, autoScaler ); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1362,6 +1384,7 @@ public class OverlordResourceTest workerTaskRunner, autoScaler ); + EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1452,26 +1475,12 @@ public class OverlordResourceTest @Test public void testGetMultipleTaskStatuses_presentTaskQueue() { - replayAll(); - - TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); + EasyMock.expect(taskMaster.getTaskQueue()) + .andReturn(Optional.of(taskQueue)); EasyMock.expect(taskQueue.getTaskStatus("task")) .andReturn(Optional.of(TaskStatus.running("task"))); - TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)); - EasyMock.replay(taskMaster, taskQueue); - OverlordResource overlordResource = new OverlordResource( - taskMaster, - null, - null, - null, - null, - null, - null, - null, - null, - null - ); + replayAll(); + final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) .getEntity(); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); @@ -1480,27 +1489,11 @@ public class OverlordResourceTest @Test public void testGetMultipleTaskStatuses_absentTaskQueue() { + EasyMock.expect(taskStorage.getStatus("task")) + .andReturn(Optional.of(TaskStatus.running("task"))); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); replayAll(); - TaskQueryTool taskQueryTool = EasyMock.createMock(TaskQueryTool.class); - EasyMock.expect(taskQueryTool.getStatus("task")) - .andReturn(Optional.of(TaskStatus.running("task"))); - - TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); - EasyMock.replay(taskMaster, taskQueryTool); - OverlordResource overlordResource = new OverlordResource( - taskMaster, - taskQueryTool, - null, - null, - null, - null, - null, - null, - null, - null - ); final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) .getEntity(); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 9bbf2e9fc8a..d1c2167b929 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTaskContextEnricher; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -106,6 +107,7 @@ public class OverlordTest private TestingServer server; private Timing timing; private CuratorFramework curator; + private DruidOverlord overlord; private TaskMaster taskMaster; private TaskLockbox taskLockbox; private TaskStorage taskStorage; @@ -235,6 +237,11 @@ public class OverlordTest taskRunnerFactory.build().run(goodTask); taskMaster = new TaskMaster( + taskActionClientFactory, + supervisorManager + ); + overlord = new DruidOverlord( + taskMaster, new TaskLockConfig(), new TaskQueueConfig(null, new Period(1), null, new Period(10), null, null), new DefaultTaskConfig(), @@ -260,20 +267,23 @@ public class OverlordTest public void testOverlordRun() throws Exception { // basic task master lifecycle test - taskMaster.start(); + overlord.start(); announcementLatch.await(); - while (!taskMaster.isLeader()) { + while (!overlord.isLeader()) { // I believe the control will never reach here and thread will never sleep but just to be on safe side Thread.sleep(10); } - Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); - Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); + Assert.assertEquals(overlord.getCurrentLeader(), druidNode.getHostAndPort()); + Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation()); - final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); - final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); + final TaskQueryTool taskQueryTool + = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null); + final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter + = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); overlordResource = new OverlordResource( + overlord, taskMaster, taskQueryTool, new IndexerMetadataStorageAdapter(taskStorage, null), @@ -282,7 +292,6 @@ public class OverlordTest auditManager, AuthTestUtils.TEST_AUTHORIZER_MAPPER, workerTaskRunnerQueryAdapter, - null, new AuthConfig() ); Response response = overlordResource.getLeader(); @@ -351,8 +360,8 @@ public class OverlordTest Assert.assertEquals(1, (((List) response.getEntity()).size())); Assert.assertEquals(1, taskMaster.getStats().rowCount()); - taskMaster.stop(); - Assert.assertFalse(taskMaster.isLeader()); + overlord.stop(); + Assert.assertFalse(overlord.isLeader()); Assert.assertEquals(0, taskMaster.getStats().rowCount()); EasyMock.verify(taskActionClientFactory); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 4dd23f2faf2..883b7828822 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -67,6 +67,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; @@ -210,6 +211,7 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class); binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); + binder.bind(DruidOverlord.class).in(ManageLifecycle.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class); @@ -382,11 +384,11 @@ public class CliOverlord extends ServerRunnable @Provides @LazySingleton @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) - public Supplier> getHeartbeatSupplier(TaskMaster taskMaster) + public Supplier> getHeartbeatSupplier(DruidOverlord overlord) { return () -> { Map heartbeatTags = new HashMap<>(); - heartbeatTags.put("leader", taskMaster.isLeader() ? 1 : 0); + heartbeatTags.put("leader", overlord.isLeader() ? 1 : 0); return heartbeatTags; }; diff --git a/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java b/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java index 60415ea4f32..0f77da1b4c4 100644 --- a/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java +++ b/services/src/main/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfo.java @@ -20,7 +20,7 @@ package org.apache.druid.cli; import com.google.inject.Inject; -import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.http.CoordinatorRedirectInfo; @@ -36,9 +36,9 @@ public class CoordinatorOverlordRedirectInfo implements RedirectInfo private final CoordinatorRedirectInfo coordinatorRedirectInfo; @Inject - public CoordinatorOverlordRedirectInfo(TaskMaster taskMaster, DruidCoordinator druidCoordinator) + public CoordinatorOverlordRedirectInfo(DruidOverlord druidOverlord, DruidCoordinator druidCoordinator) { - this.overlordRedirectInfo = new OverlordRedirectInfo(taskMaster); + this.overlordRedirectInfo = new OverlordRedirectInfo(druidOverlord); this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator); } diff --git a/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java b/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java new file mode 100644 index 00000000000..d33c05838a2 --- /dev/null +++ b/services/src/test/java/org/apache/druid/cli/CoordinatorOverlordRedirectInfoTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import org.apache.druid.indexing.overlord.DruidOverlord; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CoordinatorOverlordRedirectInfoTest +{ + private DruidOverlord overlord; + private DruidCoordinator coordinator; + private CoordinatorOverlordRedirectInfo redirectInfo; + + @Before + public void setUp() + { + overlord = EasyMock.createMock(DruidOverlord.class); + coordinator = EasyMock.createMock(DruidCoordinator.class); + redirectInfo = new CoordinatorOverlordRedirectInfo(overlord, coordinator); + } + + @Test + public void testDoLocalIndexerWhenLeading() + { + EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes(); + EasyMock.replay(overlord); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); + Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path")); + EasyMock.verify(overlord); + } +}