Refactor: Clean up Overlord guice dependencies (#16752)

Description:
Overlord guice dependencies are currently a little difficult to plug into.
This was encountered while working on a separate PR where a class needed to depend
on `TaskMaster.getTaskQueue()`  to query some task related info but this class itself
needs to be a dependency of `TaskMaster` so that it can be registered to the leader lifecycle.

The approach taken here is to simply decouple the leadership lifecycle of the overlord from
manipulation or querying of its state.

Changes:
- No functional change
- Add new class `DruidOverlord` to contain leadership logic after the model of `DruidCoordinator`
- The new class `DruidOverlord` should not be a dependency of any class with the exception of
REST endpoint `*Resource` classes.
- All classes that need to listen to leadership changes must be a dependency of `DruidOverlord`
so that they can be registered to the leadership lifecycle.
- Move all querying logic from `OverlordResource` to `TaskQueryTool` so that other classes can
leverage this logic too (required for follow up PR).
- Update tests
This commit is contained in:
Kashif Faraz 2024-07-19 05:00:23 -07:00 committed by GitHub
parent 35b876436b
commit b1edf4a5b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 826 additions and 682 deletions

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.TaskContextEnricher;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulates the leadership lifecycle of the Druid Overlord service.
* No classes other than Resource endpoints should have this class as a dependency.
* To query the current state of the Overlord, use {@link TaskMaster} instead.
*/
public class DruidOverlord
{
private static final EmittingLogger log = new EmittingLogger(DruidOverlord.class);
private final DruidLeaderSelector overlordLeaderSelector;
private final DruidLeaderSelector.Listener leadershipListener;
private final ReentrantLock giant = new ReentrantLock(true);
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<>(null);
/**
* Indicates that all services have been started and the node can now announce
* itself with {@link ServiceAnnouncer#announce}. This must be set to false
* as soon as {@link DruidLeaderSelector.Listener#stopBeingLeader()} is
* called.
*/
private volatile boolean initialized;
@Inject
public DruidOverlord(
final TaskMaster taskMaster,
final TaskLockConfig taskLockConfig,
final TaskQueueConfig taskQueueConfig,
final DefaultTaskConfig defaultTaskConfig,
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode selfNode,
final TaskRunnerFactory runnerFactory,
final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final ObjectMapper mapper,
final TaskContextEnricher taskContextEnricher
)
{
this.overlordLeaderSelector = overlordLeaderSelector;
final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());
this.leadershipListener = new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
giant.lock();
// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");
try {
final TaskRunner taskRunner = runnerFactory.build();
final TaskQueue taskQueue = new TaskQueue(
taskLockConfig,
taskQueueConfig,
defaultTaskConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter,
mapper,
taskContextEnricher
);
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle("task-master");
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordDutyExecutor);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
segmentAllocationQueue.becomeLeader();
taskMaster.becomeLeader(taskRunner, taskQueue);
// Announce the node only after all the services have been initialized
initialized = true;
serviceAnnouncer.announce(node);
}
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
taskMaster.stopBeingLeader();
segmentAllocationQueue.stopBeingLeader();
}
}
);
leaderLifecycle.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
giant.unlock();
}
}
@Override
public void stopBeingLeader()
{
giant.lock();
try {
initialized = false;
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
finally {
giant.unlock();
}
}
};
}
/**
* Starts waiting for leadership.
* Should be called only once throughout the life of the service.
*/
@LifecycleStart
public void start()
{
giant.lock();
try {
overlordLeaderSelector.registerListener(leadershipListener);
}
finally {
giant.unlock();
}
}
/**
* Stops forever (not just this particular leadership session).
* Should be called only once throughout the life of the service.
*/
@LifecycleStop
public void stop()
{
giant.lock();
try {
gracefulStopLeaderLifecycle();
overlordLeaderSelector.unregisterListener();
}
finally {
giant.unlock();
}
}
/**
* @return true if it's the leader and all its services have been initialized.
*/
public boolean isLeader()
{
return overlordLeaderSelector.isLeader() && initialized;
}
public String getCurrentLeader()
{
return overlordLeaderSelector.getCurrentLeader();
}
public Optional<String> getRedirectLocation()
{
String leader = overlordLeaderSelector.getCurrentLeader();
// do not redirect when
// leader is not elected
// leader is the current node
if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) {
return Optional.absent();
} else {
return Optional.of(leader);
}
}
private void gracefulStopLeaderLifecycle()
{
try {
if (isLeader()) {
leadershipListener.stopBeingLeader();
}
}
catch (Exception ex) {
// fail silently since we are stopping anyway
}
}
}

View File

@ -19,254 +19,61 @@
package org.apache.druid.indexing.overlord; package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidLeaderSelector.Listener;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskContextEnricher;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider; import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Encapsulates the indexer leadership lifecycle. * Encapsulates various Overlord classes that allow querying and updating the
* current state of the Overlord leader.
*/ */
public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider
{ {
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
private final DruidLeaderSelector overlordLeaderSelector;
private final DruidLeaderSelector.Listener leadershipListener;
private final ReentrantLock giant = new ReentrantLock(true);
private final TaskActionClientFactory taskActionClientFactory; private final TaskActionClientFactory taskActionClientFactory;
private final SupervisorManager supervisorManager; private final SupervisorManager supervisorManager;
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<>(null);
private volatile TaskRunner taskRunner; private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue; private volatile TaskQueue taskQueue;
/** private final AtomicBoolean isLeader = new AtomicBoolean(false);
* This flag indicates that all services has been started and should be true before calling
* {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is
* called.
*/
private volatile boolean initialized;
@Inject @Inject
public TaskMaster( public TaskMaster(
final TaskLockConfig taskLockConfig, TaskActionClientFactory taskActionClientFactory,
final TaskQueueConfig taskQueueConfig, SupervisorManager supervisorManager
final DefaultTaskConfig defaultTaskConfig,
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode selfNode,
final TaskRunnerFactory runnerFactory,
final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final ObjectMapper mapper,
final TaskContextEnricher taskContextEnricher
) )
{ {
this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory; this.taskActionClientFactory = taskActionClientFactory;
this.supervisorManager = supervisorManager;
}
this.overlordLeaderSelector = overlordLeaderSelector; public void becomeLeader(TaskRunner taskRunner, TaskQueue taskQueue)
final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());
this.leadershipListener = new DruidLeaderSelector.Listener()
{ {
@Override this.taskRunner = taskRunner;
public void becomeLeader() this.taskQueue = taskQueue;
{ isLeader.set(true);
giant.lock();
// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");
try {
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskLockConfig,
taskQueueConfig,
defaultTaskConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter,
mapper,
taskContextEnricher
);
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle("task-master");
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
} }
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordDutyExecutor);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
segmentAllocationQueue.becomeLeader();
}
@Override
public void stop()
{
segmentAllocationQueue.stopBeingLeader();
}
}
);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
initialized = true;
serviceAnnouncer.announce(node);
}
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
);
leaderLifecycle.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
giant.unlock();
}
}
@Override
public void stopBeingLeader() public void stopBeingLeader()
{ {
giant.lock(); isLeader.set(false);
try { this.taskQueue = null;
initialized = false; this.taskRunner = null;
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
finally {
giant.unlock();
}
}
};
} }
/** private boolean isLeader()
* Starts waiting for leadership. Should only be called once throughout the life of the program.
*/
@LifecycleStart
public void start()
{ {
giant.lock(); return isLeader.get();
try {
overlordLeaderSelector.registerListener(leadershipListener);
}
finally {
giant.unlock();
}
}
/**
* Stops forever (not just this particular leadership session). Should only be called once throughout the life of
* the program.
*/
@LifecycleStop
public void stop()
{
giant.lock();
try {
gracefulStopLeaderLifecycle();
overlordLeaderSelector.unregisterListener();
}
finally {
giant.unlock();
}
}
/**
* Returns true if it's the leader and all its services have been initialized.
*/
public boolean isLeader()
{
return overlordLeaderSelector.isLeader() && initialized;
}
public String getCurrentLeader()
{
return overlordLeaderSelector.getCurrentLeader();
}
public Optional<String> getRedirectLocation()
{
String leader = overlordLeaderSelector.getCurrentLeader();
// do not redirect when
// leader is not elected
// leader is the current node
if (leader == null || leader.isEmpty() || overlordLeaderSelector.isLeader()) {
return Optional.absent();
} else {
return Optional.of(leader);
}
} }
public Optional<TaskRunner> getTaskRunner() public Optional<TaskRunner> getTaskRunner()
@ -380,18 +187,6 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
} }
} }
private void gracefulStopLeaderLifecycle()
{
try {
if (isLeader()) {
leadershipListener.stopBeingLeader();
}
}
catch (Exception ex) {
// fail silently since we are stopping anyway
}
}
@Override @Override
@Nullable @Nullable
public Map<String, Long> getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()

View File

@ -20,19 +20,37 @@
package org.apache.druid.indexing.overlord; package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* Provides read-only methods to fetch information related to tasks. * Provides read-only methods to fetch information related to tasks.
@ -42,16 +60,28 @@ import java.util.Map;
*/ */
public class TaskQueryTool public class TaskQueryTool
{ {
private static final Logger log = new Logger(TaskQueryTool.class);
private final TaskStorage storage; private final TaskStorage storage;
private final TaskLockbox taskLockbox; private final TaskLockbox taskLockbox;
private final TaskMaster taskMaster; private final TaskMaster taskMaster;
private final JacksonConfigManager configManager;
private final ProvisioningStrategy provisioningStrategy;
@Inject @Inject
public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) public TaskQueryTool(
TaskStorage storage,
TaskLockbox taskLockbox,
TaskMaster taskMaster,
ProvisioningStrategy provisioningStrategy,
JacksonConfigManager configManager
)
{ {
this.storage = storage; this.storage = storage;
this.taskLockbox = taskLockbox; this.taskLockbox = taskLockbox;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.configManager = configManager;
this.provisioningStrategy = provisioningStrategy;
} }
/** /**
@ -81,13 +111,10 @@ public class TaskQueryTool
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource) public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{ {
return storage.getTaskInfos( return storage.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource);
TaskLookup.activeTasksOnly(),
dataSource
);
} }
public List<TaskStatusPlus> getTaskStatusPlusList( private List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups, Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource @Nullable String dataSource
) )
@ -107,10 +134,15 @@ public class TaskQueryTool
return storage.getTask(taskId); return storage.getTask(taskId);
} }
public Optional<TaskStatus> getStatus(final String taskId) public Optional<TaskStatus> getTaskStatus(final String taskId)
{ {
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getTaskStatus(taskId);
} else {
return storage.getStatus(taskId); return storage.getStatus(taskId);
} }
}
@Nullable @Nullable
public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId) public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
@ -118,4 +150,235 @@ public class TaskQueryTool
return storage.getTaskInfo(taskId); return storage.getTaskInfo(taskId);
} }
public List<TaskStatusPlus> getTaskStatusPlusList(
TaskStateLookup state,
@Nullable String dataSource,
@Nullable String createdTimeInterval,
@Nullable Integer maxCompletedTasks,
@Nullable String type
)
{
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
return Collections.emptyList();
}
final TaskRunner taskRunner = taskRunnerOptional.get();
final Duration createdTimeDuration;
if (createdTimeInterval != null) {
final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
createdTimeDuration = theInterval.toDuration();
} else {
createdTimeDuration = null;
}
// Ideally, snapshotting in taskStorage and taskRunner should be done atomically,
// but there is no way to do it today.
// Instead, we first gets a snapshot from taskStorage and then one from taskRunner.
// This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process
// and use the snapshot from taskRunner as a reference for potential task state updates happened
// after the first snapshotting.
Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusList(
state,
dataSource,
createdTimeDuration,
maxCompletedTasks,
type
);
final Map<String, ? extends TaskRunnerWorkItem> runnerWorkItems = getTaskRunnerWorkItems(
taskRunner,
state,
dataSource,
type
);
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
// We are interested in only those tasks which are in taskRunner.
taskStatusPlusStream = taskStatusPlusStream
.filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId()));
}
final List<TaskStatusPlus> taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList());
// Separate complete and active tasks from taskStorage.
// Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType.
final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
for (TaskStatusPlus statusPlus : taskStatusPlusList) {
if (statusPlus.getStatusCode().isComplete()) {
completeTaskStatusPlusList.add(statusPlus);
} else {
activeTaskStatusPlusList.add(statusPlus);
}
}
final List<TaskStatusPlus> taskStatuses = new ArrayList<>(completeTaskStatusPlusList);
activeTaskStatusPlusList.forEach(statusPlus -> {
final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId());
if (runnerWorkItem == null) {
// a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner.
if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
taskStatuses.add(statusPlus);
}
} else {
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
taskStatuses.add(
new TaskStatusPlus(
statusPlus.getId(),
statusPlus.getGroupId(),
statusPlus.getType(),
statusPlus.getCreatedTime(),
runnerWorkItem.getQueueInsertionTime(),
statusPlus.getStatusCode(),
taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner
statusPlus.getDuration(),
runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done.
statusPlus.getDataSource(),
statusPlus.getErrorMsg()
)
);
}
}
});
return taskStatuses;
}
private Stream<TaskStatusPlus> getTaskStatusPlusList(
TaskStateLookup state,
@Nullable String dataSource,
Duration createdTimeDuration,
@Nullable Integer maxCompletedTasks,
@Nullable String type
)
{
final Map<TaskLookupType, TaskLookup> taskLookups;
switch (state) {
case ALL:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
TaskLookup.ActiveTaskLookup.getInstance(),
TaskLookupType.COMPLETE,
TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case COMPLETE:
taskLookups = ImmutableMap.of(
TaskLookupType.COMPLETE,
TaskLookup.CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case WAITING:
case PENDING:
case RUNNING:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
TaskLookup.ActiveTaskLookup.getInstance()
);
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
final Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusList(
taskLookups,
dataSource
).stream();
if (type != null) {
return taskStatusPlusStream.filter(
statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType())
);
} else {
return taskStatusPlusStream;
}
}
private Map<String, ? extends TaskRunnerWorkItem> getTaskRunnerWorkItems(
TaskRunner taskRunner,
TaskStateLookup state,
@Nullable String dataSource,
@Nullable String type
)
{
Stream<? extends TaskRunnerWorkItem> runnerWorkItemsStream;
switch (state) {
case ALL:
case WAITING:
// waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner)
runnerWorkItemsStream = taskRunner.getKnownTasks().stream();
break;
case PENDING:
runnerWorkItemsStream = taskRunner.getPendingTasks().stream();
break;
case RUNNING:
runnerWorkItemsStream = taskRunner.getRunningTasks().stream();
break;
case COMPLETE:
runnerWorkItemsStream = Stream.empty();
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
if (dataSource != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource()));
}
if (type != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType()));
}
return runnerWorkItemsStream
.collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item));
}
public TotalWorkerCapacityResponse getTotalWorkerCapacity()
{
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
return null;
}
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();
int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers);
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
} else {
// Auto-scale is not using DefaultWorkerBehaviorConfig
log.debug(
"Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
workerBehaviorConfig,
workerBehaviorConfig.getClass().getSimpleName()
);
maximumCapacity = -1;
}
return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity);
}
public WorkerBehaviorConfig getLatestWorkerConfig()
{
return configManager.watch(
WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class
).get();
}
} }

View File

@ -22,7 +22,7 @@ package org.apache.druid.indexing.overlord.http;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.http.RedirectInfo; import org.apache.druid.server.http.RedirectInfo;
@ -38,25 +38,25 @@ public class OverlordRedirectInfo implements RedirectInfo
"/druid/indexer/v1/isLeader" "/druid/indexer/v1/isLeader"
); );
private final TaskMaster taskMaster; private final DruidOverlord overlord;
@Inject @Inject
public OverlordRedirectInfo(TaskMaster taskMaster) public OverlordRedirectInfo(DruidOverlord overlord)
{ {
this.taskMaster = taskMaster; this.overlord = overlord;
} }
@Override @Override
public boolean doLocal(String requestURI) public boolean doLocal(String requestURI)
{ {
return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || taskMaster.isLeader(); return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || overlord.isLeader();
} }
@Override @Override
public URL getRedirectURL(String queryString, String requestURI) public URL getRedirectURL(String queryString, String requestURI)
{ {
try { try {
final Optional<String> redirectLocation = taskMaster.getRedirectLocation(); final Optional<String> redirectLocation = overlord.getRedirectLocation();
if (!redirectLocation.isPresent()) { if (!redirectLocation.isPresent()) {
return null; return null;
} }

View File

@ -33,6 +33,7 @@ import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager; import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.Configs;
import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.RunnerTaskState;
@ -44,31 +45,23 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.actions.TaskActionHolder;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter; import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.HttpMediaType;
import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.ConfigResourceFilter;
@ -84,7 +77,7 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType; import org.apache.druid.server.security.ResourceType;
import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.Duration; import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -104,17 +97,12 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* *
@ -125,6 +113,7 @@ public class OverlordResource
private static final Logger log = new Logger(OverlordResource.class); private static final Logger log = new Logger(OverlordResource.class);
private final TaskMaster taskMaster; private final TaskMaster taskMaster;
private final DruidOverlord overlord;
private final TaskQueryTool taskQueryTool; private final TaskQueryTool taskQueryTool;
private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
@ -132,35 +121,16 @@ public class OverlordResource
private final AuditManager auditManager; private final AuditManager auditManager;
private final AuthorizerMapper authorizerMapper; private final AuthorizerMapper authorizerMapper;
private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
private final ProvisioningStrategy provisioningStrategy;
private final AuthConfig authConfig; private final AuthConfig authConfig;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List<String> API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); private static final List<String> API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
private static final Set<String> AUDITED_TASK_TYPES private static final Set<String> AUDITED_TASK_TYPES
= ImmutableSet.of("index", "index_parallel", "compact", "index_hadoop", "kill"); = ImmutableSet.of("index", "index_parallel", "compact", "index_hadoop", "kill");
private enum TaskStateLookup
{
ALL,
WAITING,
PENDING,
RUNNING,
COMPLETE;
private static TaskStateLookup fromString(@Nullable String state)
{
if (state == null) {
return ALL;
} else {
return TaskStateLookup.valueOf(StringUtils.toUpperCase(state));
}
}
}
@Inject @Inject
public OverlordResource( public OverlordResource(
DruidOverlord overlord,
TaskMaster taskMaster, TaskMaster taskMaster,
TaskQueryTool taskQueryTool, TaskQueryTool taskQueryTool,
IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter,
@ -169,10 +139,10 @@ public class OverlordResource
AuditManager auditManager, AuditManager auditManager,
AuthorizerMapper authorizerMapper, AuthorizerMapper authorizerMapper,
WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
ProvisioningStrategy provisioningStrategy,
AuthConfig authConfig AuthConfig authConfig
) )
{ {
this.overlord = overlord;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.taskQueryTool = taskQueryTool; this.taskQueryTool = taskQueryTool;
this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter; this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter;
@ -181,7 +151,6 @@ public class OverlordResource
this.auditManager = auditManager; this.auditManager = auditManager;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter; this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
this.provisioningStrategy = provisioningStrategy;
this.authConfig = authConfig; this.authConfig = authConfig;
} }
@ -252,7 +221,7 @@ public class OverlordResource
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Response getLeader() public Response getLeader()
{ {
return Response.ok(taskMaster.getCurrentLeader()).build(); return Response.ok(overlord.getCurrentLeader()).build();
} }
/** /**
@ -263,7 +232,7 @@ public class OverlordResource
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Response isLeader() public Response isLeader()
{ {
final boolean leading = taskMaster.isLeader(); final boolean leading = overlord.isLeader();
final Map<String, Boolean> response = ImmutableMap.of("leader", leading); final Map<String, Boolean> response = ImmutableMap.of("leader", leading);
if (leading) { if (leading) {
return Response.ok(response).build(); return Response.ok(response).build();
@ -373,9 +342,7 @@ public class OverlordResource
taskInfo.getStatus().getStatusCode(), taskInfo.getStatus().getStatusCode(),
RunnerTaskState.WAITING, RunnerTaskState.WAITING,
taskInfo.getStatus().getDuration(), taskInfo.getStatus().getDuration(),
taskInfo.getStatus().getLocation() == null Configs.valueOrDefault(taskInfo.getStatus().getLocation(), TaskLocation.unknown()),
? TaskLocation.unknown()
: taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(), taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg() taskInfo.getStatus().getErrorMsg()
) )
@ -415,15 +382,10 @@ public class OverlordResource
{ {
return asLeaderWith( return asLeaderWith(
taskMaster.getTaskQueue(), taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>() taskQueue -> {
{
@Override
public Response apply(TaskQueue taskQueue)
{
taskQueue.shutdown(taskid, "Shutdown request from user"); taskQueue.shutdown(taskid, "Shutdown request from user");
return Response.ok(ImmutableMap.of("task", taskid)).build(); return Response.ok(ImmutableMap.of("task", taskid)).build();
} }
}
); );
} }
@ -435,11 +397,7 @@ public class OverlordResource
{ {
return asLeaderWith( return asLeaderWith(
taskMaster.getTaskQueue(), taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>() taskQueue -> {
{
@Override
public Response apply(TaskQueue taskQueue)
{
final List<TaskInfo<Task, TaskStatus>> tasks = taskQueryTool.getActiveTaskInfo(dataSource); final List<TaskInfo<Task, TaskStatus>> tasks = taskQueryTool.getActiveTaskInfo(dataSource);
if (tasks.isEmpty()) { if (tasks.isEmpty()) {
return Response.status(Status.NOT_FOUND).build(); return Response.status(Status.NOT_FOUND).build();
@ -450,7 +408,6 @@ public class OverlordResource
return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); return Response.ok(ImmutableMap.of("dataSource", dataSource)).build();
} }
} }
}
); );
} }
@ -460,19 +417,13 @@ public class OverlordResource
@ResourceFilters(StateResourceFilter.class) @ResourceFilters(StateResourceFilter.class)
public Response getMultipleTaskStatuses(Set<String> taskIds) public Response getMultipleTaskStatuses(Set<String> taskIds)
{ {
if (taskIds == null || taskIds.size() == 0) { if (CollectionUtils.isNullOrEmpty(taskIds)) {
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build(); return Response.status(Response.Status.BAD_REQUEST).entity("No Task IDs provided.").build();
} }
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue(); final Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) { for (String taskId : taskIds) {
final Optional<TaskStatus> optional; final Optional<TaskStatus> optional = taskQueryTool.getTaskStatus(taskId);
if (taskQueue.isPresent()) {
optional = taskQueue.get().getTaskStatus(taskId);
} else {
optional = taskQueryTool.getStatus(taskId);
}
if (optional.isPresent()) { if (optional.isPresent()) {
result.put(taskId, optional.get()); result.put(taskId, optional.get());
} }
@ -487,11 +438,7 @@ public class OverlordResource
@ResourceFilters(ConfigResourceFilter.class) @ResourceFilters(ConfigResourceFilter.class)
public Response getWorkerConfig() public Response getWorkerConfig()
{ {
if (workerConfigRef == null) { return Response.ok(taskQueryTool.getLatestWorkerConfig()).build();
workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
}
return Response.ok(workerConfigRef.get()).build();
} }
/** /**
@ -503,49 +450,11 @@ public class OverlordResource
@ResourceFilters(ConfigResourceFilter.class) @ResourceFilters(ConfigResourceFilter.class)
public Response getTotalWorkerCapacity() public Response getTotalWorkerCapacity()
{ {
// Calculate current cluster capacity if (overlord.isLeader()) {
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner(); return Response.ok(taskQueryTool.getTotalWorkerCapacity()).build();
if (!taskRunnerOptional.isPresent()) { } else {
// Cannot serve call as not leader
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
} }
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();
int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
if (workerConfigRef == null) {
workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
}
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers);
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
} else {
// Auto scale is not using DefaultWorkerBehaviorConfig
log.debug(
"Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
workerBehaviorConfig,
workerBehaviorConfig.getClass().getSimpleName()
);
maximumCapacity = -1;
}
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build();
} }
// default value is used for backwards compatibility // default value is used for backwards compatibility
@ -693,9 +602,13 @@ public class OverlordResource
//check for valid state //check for valid state
if (state != null) { if (state != null) {
if (!API_TASK_STATES.contains(StringUtils.toLowerCase(state))) { if (!API_TASK_STATES.contains(StringUtils.toLowerCase(state))) {
String errorMessage = StringUtils.format(
"Invalid task state[%s]. Must be one of %s.",
state, API_TASK_STATES
);
return Response.status(Status.BAD_REQUEST) return Response.status(Status.BAD_REQUEST)
.type(MediaType.TEXT_PLAIN) .type(MediaType.TEXT_PLAIN)
.entity(StringUtils.format("Invalid state : %s, valid values are: %s", state, API_TASK_STATES)) .entity(errorMessage)
.build(); .build();
} }
} }
@ -725,8 +638,7 @@ public class OverlordResource
taskMaster.getTaskRunner(), taskMaster.getTaskRunner(),
taskRunner -> { taskRunner -> {
final List<TaskStatusPlus> authorizedList = securedTaskStatusPlus( final List<TaskStatusPlus> authorizedList = securedTaskStatusPlus(
getTaskStatusPlusList( taskQueryTool.getTaskStatusPlusList(
taskRunner,
TaskStateLookup.fromString(state), TaskStateLookup.fromString(state),
dataSource, dataSource,
createdTimeInterval, createdTimeInterval,
@ -741,180 +653,6 @@ public class OverlordResource
); );
} }
private List<TaskStatusPlus> getTaskStatusPlusList(
TaskRunner taskRunner,
TaskStateLookup state,
@Nullable String dataSource,
@Nullable String createdTimeInterval,
@Nullable Integer maxCompletedTasks,
@Nullable String type
)
{
final Duration createdTimeDuration;
if (createdTimeInterval != null) {
final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/"));
createdTimeDuration = theInterval.toDuration();
} else {
createdTimeDuration = null;
}
// Ideally, snapshotting in taskStorage and taskRunner should be done atomically,
// but there is no way to do it today.
// Instead, we first gets a snapshot from taskStorage and then one from taskRunner.
// This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process
// and use the snapshot from taskRunner as a reference for potential task state updates happened
// after the first snapshotting.
Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusList(
state,
dataSource,
createdTimeDuration,
maxCompletedTasks,
type
);
final Map<String, ? extends TaskRunnerWorkItem> runnerWorkItems = getTaskRunnerWorkItems(
taskRunner,
state,
dataSource,
type
);
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
// We are interested in only those tasks which are in taskRunner.
taskStatusPlusStream = taskStatusPlusStream
.filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId()));
}
final List<TaskStatusPlus> taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList());
// Separate complete and active tasks from taskStorage.
// Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType.
final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
for (TaskStatusPlus statusPlus : taskStatusPlusList) {
if (statusPlus.getStatusCode().isComplete()) {
completeTaskStatusPlusList.add(statusPlus);
} else {
activeTaskStatusPlusList.add(statusPlus);
}
}
final List<TaskStatusPlus> taskStatuses = new ArrayList<>(completeTaskStatusPlusList);
activeTaskStatusPlusList.forEach(statusPlus -> {
final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId());
if (runnerWorkItem == null) {
// a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner.
if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
taskStatuses.add(statusPlus);
}
} else {
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
taskStatuses.add(
new TaskStatusPlus(
statusPlus.getId(),
statusPlus.getGroupId(),
statusPlus.getType(),
statusPlus.getCreatedTime(),
runnerWorkItem.getQueueInsertionTime(),
statusPlus.getStatusCode(),
taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner
statusPlus.getDuration(),
runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done.
statusPlus.getDataSource(),
statusPlus.getErrorMsg()
)
);
}
}
});
return taskStatuses;
}
private Stream<TaskStatusPlus> getTaskStatusPlusList(
TaskStateLookup state,
@Nullable String dataSource,
Duration createdTimeDuration,
@Nullable Integer maxCompletedTasks,
@Nullable String type
)
{
final Map<TaskLookupType, TaskLookup> taskLookups;
switch (state) {
case ALL:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(),
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case COMPLETE:
taskLookups = ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration)
);
break;
case WAITING:
case PENDING:
case RUNNING:
taskLookups = ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
);
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
final Stream<TaskStatusPlus> taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList(
taskLookups,
dataSource
).stream();
if (type != null) {
return taskStatusPlusStream.filter(
statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType())
);
} else {
return taskStatusPlusStream;
}
}
private Map<String, ? extends TaskRunnerWorkItem> getTaskRunnerWorkItems(
TaskRunner taskRunner,
TaskStateLookup state,
@Nullable String dataSource,
@Nullable String type
)
{
Stream<? extends TaskRunnerWorkItem> runnerWorkItemsStream;
switch (state) {
case ALL:
case WAITING:
// waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner)
runnerWorkItemsStream = taskRunner.getKnownTasks().stream();
break;
case PENDING:
runnerWorkItemsStream = taskRunner.getPendingTasks().stream();
break;
case RUNNING:
runnerWorkItemsStream = taskRunner.getRunningTasks().stream();
break;
case COMPLETE:
runnerWorkItemsStream = Stream.empty();
break;
default:
throw new IAE("Unknown state: [%s]", state);
}
if (dataSource != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource()));
}
if (type != null) {
runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType()));
}
return runnerWorkItemsStream
.collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item));
}
@DELETE @DELETE
@Path("/pendingSegments/{dataSource}") @Path("/pendingSegments/{dataSource}")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@ -939,7 +677,7 @@ public class OverlordResource
throw new ForbiddenException(authResult.getMessage()); throw new ForbiddenException(authResult.getMessage());
} }
if (taskMaster.isLeader()) { if (overlord.isLeader()) {
try { try {
final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval); final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build(); return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build();
@ -970,25 +708,19 @@ public class OverlordResource
{ {
return asLeaderWith( return asLeaderWith(
taskMaster.getTaskRunner(), taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>() taskRunner -> {
{
@Override
public Response apply(TaskRunner taskRunner)
{
if (taskRunner instanceof WorkerTaskRunner) { if (taskRunner instanceof WorkerTaskRunner) {
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
} else { } else {
log.debug( log.debug(
"Task runner[%s] of type[%s] does not support listing workers", "Task runner[%s] of type[%s] does not support listing workers",
taskRunner, taskRunner, taskRunner.getClass().getName()
taskRunner.getClass().getName()
); );
return Response.serverError() return Response.serverError()
.entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) .entity(ImmutableMap.of("error", "Task Runner does not support worker listing"))
.build(); .build();
} }
} }
}
); );
} }

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
public enum TaskStateLookup
{
ALL,
WAITING,
PENDING,
RUNNING,
COMPLETE;
public static TaskStateLookup fromString(@Nullable String state)
{
if (state == null) {
return ALL;
} else {
return TaskStateLookup.valueOf(StringUtils.toUpperCase(state));
}
}
}

View File

@ -65,7 +65,7 @@ public class OverlordBlinkLeadershipTest
/** /**
* Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link * Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link
* TaskMaster#stop()}), then creating a new taskRunner from {@link * TaskMaster#stopBeingLeader()}), then creating a new taskRunner from {@link
* org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in * org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in
* {@link TaskMaster} and start it again. * {@link TaskMaster} and start it again.
*/ */

View File

@ -420,17 +420,11 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
// For creating a customized TaskQueue see testRealtimeIndexTaskFailure test // For creating a customized TaskQueue see testRealtimeIndexTaskFailure test
taskStorage = setUpTaskStorage(); taskStorage = setUpTaskStorage();
handoffNotifierFactory = setUpSegmentHandOffNotifierFactory(); handoffNotifierFactory = setUpSegmentHandOffNotifierFactory();
dataSegmentPusher = setUpDataSegmentPusher(); dataSegmentPusher = setUpDataSegmentPusher();
mdc = setUpMetadataStorageCoordinator(); mdc = setUpMetadataStorageCoordinator();
tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);
taskRunner = setUpThreadPoolTaskRunner(tb); taskRunner = setUpThreadPoolTaskRunner(tb);
taskQueue = setUpTaskQueue(taskStorage, taskRunner); taskQueue = setUpTaskQueue(taskStorage, taskRunner);
} }
@ -477,7 +471,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(taskMaster);
tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
return taskStorage; return taskStorage;
} }
@ -738,7 +732,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
null null
); );
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId()); final Optional<TaskStatus> preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId());
Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent());
final TaskStatus mergedStatus = runTask(indexTask); final TaskStatus mergedStatus = runTask(indexTask);
@ -1235,7 +1229,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskQueue.start(); taskQueue.start();
taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId())); taskStorage.insert(indexTask, TaskStatus.running(indexTask.getId()));
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { while (tsqa.getTaskStatus(indexTask.getId()).get().isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) { if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", indexTask.getId()); throw new ISE("Where did the task go?!: %s", indexTask.getId());
} }
@ -1319,7 +1313,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
null null
); );
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId()); final Optional<TaskStatus> preRunTaskStatus = tsqa.getTaskStatus(indexTask.getId());
Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent()); Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent());
final TaskStatus mergedStatus = runTask(indexTask); final TaskStatus mergedStatus = runTask(indexTask);
@ -1409,7 +1403,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
TaskStatus retVal = null; TaskStatus retVal = null;
try { try {
TaskStatus status; TaskStatus status;
while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { while ((status = tsqa.getTaskStatus(taskId).get()).isRunnable()) {
if (taskRunDuration.millisElapsed() > 10_000) { if (taskRunDuration.millisElapsed() > 10_000) {
throw new ISE("Where did the task go?!: %s", task.getId()); throw new ISE("Where did the task go?!: %s", task.getId());
} }

View File

@ -20,7 +20,7 @@
package org.apache.druid.indexing.overlord.http; package org.apache.druid.indexing.overlord.http;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.DruidOverlord;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -32,48 +32,48 @@ import java.net.URLEncoder;
public class OverlordRedirectInfoTest public class OverlordRedirectInfoTest
{ {
private TaskMaster taskMaster; private DruidOverlord overlord;
private OverlordRedirectInfo redirectInfo; private OverlordRedirectInfo redirectInfo;
@Before @Before
public void setUp() public void setUp()
{ {
taskMaster = EasyMock.createMock(TaskMaster.class); overlord = EasyMock.createMock(DruidOverlord.class);
redirectInfo = new OverlordRedirectInfo(taskMaster); redirectInfo = new OverlordRedirectInfo(overlord);
} }
@Test @Test
public void testDoLocalWhenLeading() public void testDoLocalWhenLeading()
{ {
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(overlord);
Assert.assertTrue(redirectInfo.doLocal(null)); Assert.assertTrue(redirectInfo.doLocal(null));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path"));
EasyMock.verify(taskMaster); EasyMock.verify(overlord);
} }
@Test @Test
public void testDoLocalWhenNotLeading() public void testDoLocalWhenNotLeading()
{ {
EasyMock.expect(taskMaster.isLeader()).andReturn(false).anyTimes(); EasyMock.expect(overlord.isLeader()).andReturn(false).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(overlord);
Assert.assertFalse(redirectInfo.doLocal(null)); Assert.assertFalse(redirectInfo.doLocal(null));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader")); Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader"));
Assert.assertFalse(redirectInfo.doLocal("/druid/indexer/v1/other/path")); Assert.assertFalse(redirectInfo.doLocal("/druid/indexer/v1/other/path"));
EasyMock.verify(taskMaster); EasyMock.verify(overlord);
} }
@Test @Test
public void testGetRedirectURLWithEmptyLocation() public void testGetRedirectURLWithEmptyLocation()
{ {
EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(overlord);
URL url = redirectInfo.getRedirectURL("query", "/request"); URL url = redirectInfo.getRedirectURL("query", "/request");
Assert.assertNull(url); Assert.assertNull(url);
EasyMock.verify(taskMaster); EasyMock.verify(overlord);
} }
@Test @Test
@ -82,11 +82,11 @@ public class OverlordRedirectInfoTest
String host = "http://localhost"; String host = "http://localhost";
String query = "foo=bar&x=y"; String query = "foo=bar&x=y";
String request = "/request"; String request = "/request";
EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(overlord);
URL url = redirectInfo.getRedirectURL(query, request); URL url = redirectInfo.getRedirectURL(query, request);
Assert.assertEquals("http://localhost/request?foo=bar&x=y", url.toString()); Assert.assertEquals("http://localhost/request?foo=bar&x=y", url.toString());
EasyMock.verify(taskMaster); EasyMock.verify(overlord);
} }
@Test @Test
@ -98,14 +98,14 @@ public class OverlordRedirectInfoTest
"UTF-8" "UTF-8"
) + "/status"; ) + "/status";
EasyMock.expect(taskMaster.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes(); EasyMock.expect(overlord.getRedirectLocation()).andReturn(Optional.of(host)).anyTimes();
EasyMock.replay(taskMaster); EasyMock.replay(overlord);
URL url = redirectInfo.getRedirectURL(null, request); URL url = redirectInfo.getRedirectURL(null, request);
Assert.assertEquals( Assert.assertEquals(
"http://localhost/druid/indexer/v1/task/index_hadoop_datasource_2017-07-12T07%3A43%3A01.495Z/status", "http://localhost/druid/indexer/v1/task/index_hadoop_datasource_2017-07-12T07%3A43%3A01.495Z/status",
url.toString() url.toString()
); );
EasyMock.verify(taskMaster); EasyMock.verify(overlord);
} }
} }

View File

@ -39,13 +39,16 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
@ -58,6 +61,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -102,7 +106,10 @@ import java.util.concurrent.atomic.AtomicReference;
public class OverlordResourceTest public class OverlordResourceTest
{ {
private OverlordResource overlordResource; private OverlordResource overlordResource;
private DruidOverlord overlord;
private TaskMaster taskMaster; private TaskMaster taskMaster;
private TaskStorage taskStorage;
private TaskLockbox taskLockbox;
private JacksonConfigManager configManager; private JacksonConfigManager configManager;
private ProvisioningStrategy provisioningStrategy; private ProvisioningStrategy provisioningStrategy;
private AuthConfig authConfig; private AuthConfig authConfig;
@ -121,12 +128,21 @@ public class OverlordResourceTest
public void setUp() public void setUp()
{ {
taskRunner = EasyMock.createMock(TaskRunner.class); taskRunner = EasyMock.createMock(TaskRunner.class);
taskQueue = EasyMock.createMock(TaskQueue.class); taskQueue = EasyMock.createStrictMock(TaskQueue.class);
configManager = EasyMock.createMock(JacksonConfigManager.class); configManager = EasyMock.createMock(JacksonConfigManager.class);
provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
authConfig = EasyMock.createMock(AuthConfig.class); authConfig = EasyMock.createMock(AuthConfig.class);
overlord = EasyMock.createStrictMock(DruidOverlord.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskQueryTool = EasyMock.createStrictMock(TaskQueryTool.class); taskStorage = EasyMock.createStrictMock(TaskStorage.class);
taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
taskQueryTool = new TaskQueryTool(
taskStorage,
taskLockbox,
taskMaster,
provisioningStrategy,
configManager
);
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
req = EasyMock.createStrictMock(HttpServletRequest.class); req = EasyMock.createStrictMock(HttpServletRequest.class);
workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class);
@ -170,6 +186,7 @@ public class OverlordResourceTest
}; };
overlordResource = new OverlordResource( overlordResource = new OverlordResource(
overlord,
taskMaster, taskMaster,
taskQueryTool, taskQueryTool,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
@ -178,7 +195,6 @@ public class OverlordResourceTest
auditManager, auditManager,
authMapper, authMapper,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
provisioningStrategy,
authConfig authConfig
); );
} }
@ -189,7 +205,8 @@ public class OverlordResourceTest
EasyMock.verify( EasyMock.verify(
taskRunner, taskRunner,
taskMaster, taskMaster,
taskQueryTool, taskStorage,
taskLockbox,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
req, req,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
@ -200,9 +217,12 @@ public class OverlordResourceTest
private void replayAll() private void replayAll()
{ {
EasyMock.replay( EasyMock.replay(
overlord,
taskRunner, taskRunner,
taskQueue,
taskMaster, taskMaster,
taskQueryTool, taskStorage,
taskLockbox,
indexerMetadataStorageAdapter, indexerMetadataStorageAdapter,
req, req,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
@ -216,7 +236,7 @@ public class OverlordResourceTest
@Test @Test
public void testLeader() public void testLeader()
{ {
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once(); EasyMock.expect(overlord.getCurrentLeader()).andReturn("boz").once();
replayAll(); replayAll();
final Response response = overlordResource.getLeader(); final Response response = overlordResource.getLeader();
@ -227,8 +247,8 @@ public class OverlordResourceTest
@Test @Test
public void testIsLeader() public void testIsLeader()
{ {
EasyMock.expect(taskMaster.isLeader()).andReturn(true).once(); EasyMock.expect(overlord.isLeader()).andReturn(true).once();
EasyMock.expect(taskMaster.isLeader()).andReturn(false).once(); EasyMock.expect(overlord.isLeader()).andReturn(false).once();
replayAll(); replayAll();
// true // true
@ -247,7 +267,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -282,7 +302,7 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null)
).andStubReturn( ).andStubReturn(
ImmutableList.of( ImmutableList.of(
@ -313,7 +333,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -340,7 +360,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(), ActiveTaskLookup.getInstance(),
@ -381,7 +401,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//completed tasks //completed tasks
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -423,7 +443,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
//active tasks //active tasks
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance() ActiveTaskLookup.getInstance()
@ -466,7 +486,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.ACTIVE, TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance() ActiveTaskLookup.getInstance()
@ -516,7 +536,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null null
) )
@ -550,7 +570,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)),
null null
) )
@ -577,7 +597,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
Duration duration = new Period("PT86400S").toStandardDuration(); Duration duration = new Period("PT86400S").toStandardDuration();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject() EasyMock.anyObject()
) )
@ -609,7 +629,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -658,7 +678,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks // Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null), CompleteTaskLookup.of(null, null),
@ -707,8 +727,10 @@ public class OverlordResourceTest
replayAll(); replayAll();
// Verify that only the tasks of read access datasource are returned // Verify that only the tasks of read access datasource are returned
expectedException.expect(WebApplicationException.class); Assert.assertThrows(
overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req); WebApplicationException.class,
() -> overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req)
);
} }
@Test @Test
@ -716,7 +738,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect( EasyMock.expect(
taskQueryTool.getTaskStatusPlusList( taskStorage.getTaskStatusPlusList(
ImmutableMap.of( ImmutableMap.of(
TaskLookupType.COMPLETE, TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null) CompleteTaskLookup.of(null, null)
@ -749,7 +771,7 @@ public class OverlordResourceTest
.getTasks("blah", "ds_test", null, null, null, req) .getTasks("blah", "ds_test", null, null, null, req)
.getEntity(); .getEntity();
Assert.assertEquals( Assert.assertEquals(
"Invalid state : blah, valid values are: [pending, waiting, running, complete]", "Invalid task state[blah]. Must be one of [pending, waiting, running, complete].",
responseObject.toString() responseObject.toString()
); );
} }
@ -824,7 +846,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock.expect(overlord.isLeader()).andReturn(true);
EasyMock EasyMock
.expect( .expect(
indexerMetadataStorageAdapter.deletePendingSegments( indexerMetadataStorageAdapter.deletePendingSegments(
@ -847,7 +869,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock.expect(overlord.isLeader()).andReturn(true);
final String exceptionMsg = "Some exception msg"; final String exceptionMsg = "Some exception msg";
EasyMock EasyMock
.expect( .expect(
@ -873,7 +895,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock.expect(overlord.isLeader()).andReturn(true);
final String exceptionMsg = "An internal defensive exception"; final String exceptionMsg = "An internal defensive exception";
EasyMock EasyMock
.expect( .expect(
@ -899,7 +921,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true); EasyMock.expect(overlord.isLeader()).andReturn(true);
final String exceptionMsg = "An unexpected illegal state exception"; final String exceptionMsg = "An unexpected illegal state exception";
EasyMock EasyMock
.expect( .expect(
@ -925,7 +947,7 @@ public class OverlordResourceTest
{ {
expectAuthorizationTokenCheck(); expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(false); EasyMock.expect(overlord.isLeader()).andReturn(false);
replayAll(); replayAll();
@ -943,11 +965,13 @@ public class OverlordResourceTest
// set authorization token properly, but isn't called in this test. // set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/druid/issues/6685. // This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck(); // expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
final NoopTask task = NoopTask.create(); final NoopTask task = NoopTask.create();
EasyMock.expect(taskQueryTool.getTask("mytask")) EasyMock.expect(taskStorage.getTask("mytask"))
.andReturn(Optional.of(task)); .andReturn(Optional.of(task));
EasyMock.expect(taskQueryTool.getTask("othertask")) EasyMock.expect(taskStorage.getTask("othertask"))
.andReturn(Optional.absent()); .andReturn(Optional.absent());
replayAll(); replayAll();
@ -1042,7 +1066,7 @@ public class OverlordResourceTest
) )
); );
EasyMock.expect(taskQueryTool.getLockedIntervals(minTaskPriority)) EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority))
.andReturn(expectedLockedIntervals); .andReturn(expectedLockedIntervals);
replayAll(); replayAll();
@ -1079,13 +1103,9 @@ public class OverlordResourceTest
// This should be fixed in https://github.com/apache/druid/issues/6685. // This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck(); // expectAuthorizationTokenCheck();
TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(taskRunner)
).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn( EasyMock.expect(taskMaster.getTaskQueue()).andReturn(
Optional.of(mockQueue) Optional.of(mockQueue)
).anyTimes(); ).once();
mockQueue.shutdown("id_1", "Shutdown request from user"); mockQueue.shutdown("id_1", "Shutdown request from user");
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -1105,14 +1125,12 @@ public class OverlordResourceTest
// This should be fixed in https://github.com/apache/druid/issues/6685. // This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck(); // expectAuthorizationTokenCheck();
TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class); TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(taskRunner)
).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn( EasyMock.expect(taskMaster.getTaskQueue()).andReturn(
Optional.of(mockQueue) Optional.of(mockQueue)
).anyTimes(); ).anyTimes();
EasyMock.expect(taskQueryTool.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( EasyMock.expect(
taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "datasource")
).andStubReturn(ImmutableList.of(
new TaskInfo<>( new TaskInfo<>(
"id_1", "id_1",
DateTime.now(ISOChronology.getInstanceUTC()), DateTime.now(ISOChronology.getInstanceUTC()),
@ -1146,9 +1164,10 @@ public class OverlordResourceTest
public void testShutdownAllTasksForNonExistingDataSource() public void testShutdownAllTasksForNonExistingDataSource()
{ {
final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskQueryTool.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); EasyMock.expect(taskStorage.getTaskInfos(EasyMock.anyObject(TaskLookup.class), EasyMock.anyString()))
.andReturn(Collections.emptyList());
replayAll(); replayAll();
final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
@ -1222,10 +1241,7 @@ public class OverlordResourceTest
@Test @Test
public void testGetTotalWorkerCapacityNotLeader() public void testGetTotalWorkerCapacityNotLeader()
{ {
EasyMock.reset(taskMaster); EasyMock.expect(overlord.isLeader()).andReturn(false);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.absent()
).anyTimes();
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus());
@ -1235,10 +1251,13 @@ public class OverlordResourceTest
public void testGetTotalWorkerCapacityWithUnknown() public void testGetTotalWorkerCapacityWithUnknown()
{ {
WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class))
.andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
@ -1255,6 +1274,7 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
@ -1272,6 +1292,7 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
@ -1317,6 +1338,7 @@ public class OverlordResourceTest
workerTaskRunner, workerTaskRunner,
autoScaler autoScaler
); );
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1362,6 +1384,7 @@ public class OverlordResourceTest
workerTaskRunner, workerTaskRunner,
autoScaler autoScaler
); );
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll(); replayAll();
final Response response = overlordResource.getTotalWorkerCapacity(); final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1452,26 +1475,12 @@ public class OverlordResourceTest
@Test @Test
public void testGetMultipleTaskStatuses_presentTaskQueue() public void testGetMultipleTaskStatuses_presentTaskQueue()
{ {
replayAll(); EasyMock.expect(taskMaster.getTaskQueue())
.andReturn(Optional.of(taskQueue));
TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskQueue.getTaskStatus("task")) EasyMock.expect(taskQueue.getTaskStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task"))); .andReturn(Optional.of(TaskStatus.running("task")));
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); replayAll();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
EasyMock.replay(taskMaster, taskQueue);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
.getEntity(); .getEntity();
Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
@ -1480,27 +1489,11 @@ public class OverlordResourceTest
@Test @Test
public void testGetMultipleTaskStatuses_absentTaskQueue() public void testGetMultipleTaskStatuses_absentTaskQueue()
{ {
EasyMock.expect(taskStorage.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
replayAll(); replayAll();
TaskQueryTool taskQueryTool = EasyMock.createMock(TaskQueryTool.class);
EasyMock.expect(taskQueryTool.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
EasyMock.replay(taskMaster, taskQueryTool);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
taskQueryTool,
null,
null,
null,
null,
null,
null,
null,
null
);
final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task")) final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
.getEntity(); .getEntity();
Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);

View File

@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher; import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -106,6 +107,7 @@ public class OverlordTest
private TestingServer server; private TestingServer server;
private Timing timing; private Timing timing;
private CuratorFramework curator; private CuratorFramework curator;
private DruidOverlord overlord;
private TaskMaster taskMaster; private TaskMaster taskMaster;
private TaskLockbox taskLockbox; private TaskLockbox taskLockbox;
private TaskStorage taskStorage; private TaskStorage taskStorage;
@ -235,6 +237,11 @@ public class OverlordTest
taskRunnerFactory.build().run(goodTask); taskRunnerFactory.build().run(goodTask);
taskMaster = new TaskMaster( taskMaster = new TaskMaster(
taskActionClientFactory,
supervisorManager
);
overlord = new DruidOverlord(
taskMaster,
new TaskLockConfig(), new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10), null, null), new TaskQueueConfig(null, new Period(1), null, new Period(10), null, null),
new DefaultTaskConfig(), new DefaultTaskConfig(),
@ -260,20 +267,23 @@ public class OverlordTest
public void testOverlordRun() throws Exception public void testOverlordRun() throws Exception
{ {
// basic task master lifecycle test // basic task master lifecycle test
taskMaster.start(); overlord.start();
announcementLatch.await(); announcementLatch.await();
while (!taskMaster.isLeader()) { while (!overlord.isLeader()) {
// I believe the control will never reach here and thread will never sleep but just to be on safe side // I believe the control will never reach here and thread will never sleep but just to be on safe side
Thread.sleep(10); Thread.sleep(10);
} }
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(overlord.getCurrentLeader(), druidNode.getHostAndPort());
Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation());
final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); final TaskQueryTool taskQueryTool
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
= new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff // Test Overlord resource stuff
AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class);
overlordResource = new OverlordResource( overlordResource = new OverlordResource(
overlord,
taskMaster, taskMaster,
taskQueryTool, taskQueryTool,
new IndexerMetadataStorageAdapter(taskStorage, null), new IndexerMetadataStorageAdapter(taskStorage, null),
@ -282,7 +292,6 @@ public class OverlordTest
auditManager, auditManager,
AuthTestUtils.TEST_AUTHORIZER_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER,
workerTaskRunnerQueryAdapter, workerTaskRunnerQueryAdapter,
null,
new AuthConfig() new AuthConfig()
); );
Response response = overlordResource.getLeader(); Response response = overlordResource.getLeader();
@ -351,8 +360,8 @@ public class OverlordTest
Assert.assertEquals(1, (((List) response.getEntity()).size())); Assert.assertEquals(1, (((List) response.getEntity()).size()));
Assert.assertEquals(1, taskMaster.getStats().rowCount()); Assert.assertEquals(1, taskMaster.getStats().rowCount());
taskMaster.stop(); overlord.stop();
Assert.assertFalse(taskMaster.isLeader()); Assert.assertFalse(overlord.isLeader());
Assert.assertEquals(0, taskMaster.getStats().rowCount()); Assert.assertEquals(0, taskMaster.getStats().rowCount());
EasyMock.verify(taskActionClientFactory); EasyMock.verify(taskActionClientFactory);

View File

@ -67,6 +67,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient; import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory; import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
@ -210,6 +211,7 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(DruidOverlord.class).in(ManageLifecycle.class);
binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
@ -382,11 +384,11 @@ public class CliOverlord extends ServerRunnable
@Provides @Provides
@LazySingleton @LazySingleton
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier<Map<String, Object>> getHeartbeatSupplier(TaskMaster taskMaster) public Supplier<Map<String, Object>> getHeartbeatSupplier(DruidOverlord overlord)
{ {
return () -> { return () -> {
Map<String, Object> heartbeatTags = new HashMap<>(); Map<String, Object> heartbeatTags = new HashMap<>();
heartbeatTags.put("leader", taskMaster.isLeader() ? 1 : 0); heartbeatTags.put("leader", overlord.isLeader() ? 1 : 0);
return heartbeatTags; return heartbeatTags;
}; };

View File

@ -20,7 +20,7 @@
package org.apache.druid.cli; package org.apache.druid.cli;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo; import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo;
import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.CoordinatorRedirectInfo; import org.apache.druid.server.http.CoordinatorRedirectInfo;
@ -36,9 +36,9 @@ public class CoordinatorOverlordRedirectInfo implements RedirectInfo
private final CoordinatorRedirectInfo coordinatorRedirectInfo; private final CoordinatorRedirectInfo coordinatorRedirectInfo;
@Inject @Inject
public CoordinatorOverlordRedirectInfo(TaskMaster taskMaster, DruidCoordinator druidCoordinator) public CoordinatorOverlordRedirectInfo(DruidOverlord druidOverlord, DruidCoordinator druidCoordinator)
{ {
this.overlordRedirectInfo = new OverlordRedirectInfo(taskMaster); this.overlordRedirectInfo = new OverlordRedirectInfo(druidOverlord);
this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator); this.coordinatorRedirectInfo = new CoordinatorRedirectInfo(druidCoordinator);
} }

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class CoordinatorOverlordRedirectInfoTest
{
private DruidOverlord overlord;
private DruidCoordinator coordinator;
private CoordinatorOverlordRedirectInfo redirectInfo;
@Before
public void setUp()
{
overlord = EasyMock.createMock(DruidOverlord.class);
coordinator = EasyMock.createMock(DruidCoordinator.class);
redirectInfo = new CoordinatorOverlordRedirectInfo(overlord, coordinator);
}
@Test
public void testDoLocalIndexerWhenLeading()
{
EasyMock.expect(overlord.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(overlord);
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path"));
EasyMock.verify(overlord);
}
}