mirror of https://github.com/apache/druid.git
Make Overlord auto-scaling and provisioning extensible (#4730)
* Make AutoScaler, ProvisioningStrategy and BaseWorkerBehaviorConfig extension points; More logging in PendingTaskBasedWorkerProvisioningStrategy * Address comments and fix a bug * Extract method * debug logging * Rename BaseWorkerBehaviorConfig to WorkerBehaviorConfig and WorkerBehaviorConfig to DefaultWorkerBehaviorConfig * Fixes
This commit is contained in:
parent
6f91d9ca1e
commit
3f1009aaa1
|
@ -74,7 +74,6 @@ public class AuditEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param None
|
|
||||||
* @return returns payload as String
|
* @return returns payload as String
|
||||||
*/
|
*/
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -82,8 +81,8 @@ public class AuditEntry
|
||||||
{
|
{
|
||||||
return payload;
|
return payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param None
|
|
||||||
* @return audit time as DateTime
|
* @return audit time as DateTime
|
||||||
*/
|
*/
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -33,6 +34,7 @@ import java.util.Set;
|
||||||
/**
|
/**
|
||||||
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
|
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
|
||||||
*/
|
*/
|
||||||
|
@PublicApi
|
||||||
public class ImmutableWorkerInfo
|
public class ImmutableWorkerInfo
|
||||||
{
|
{
|
||||||
private final Worker worker;
|
private final Worker worker;
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
import io.druid.indexing.common.TaskStatus;
|
import io.druid.indexing.common.TaskStatus;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.autoscaling.ScalingStats;
|
import io.druid.indexing.overlord.autoscaling.ScalingStats;
|
||||||
|
@ -35,6 +36,7 @@ import java.util.concurrent.Executor;
|
||||||
* Interface for handing off tasks. Managed by a {@link TaskQueue}.
|
* Interface for handing off tasks. Managed by a {@link TaskQueue}.
|
||||||
* Holds state
|
* Holds state
|
||||||
*/
|
*/
|
||||||
|
@PublicApi
|
||||||
public interface TaskRunner
|
public interface TaskRunner
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,31 +19,28 @@
|
||||||
package io.druid.indexing.overlord;
|
package io.druid.indexing.overlord;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@PublicApi
|
||||||
public interface WorkerTaskRunner extends TaskRunner
|
public interface WorkerTaskRunner extends TaskRunner
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* List of known workers who can accept tasks
|
* List of known workers who can accept tasks for running
|
||||||
* @return A list of workers who can accept tasks for running
|
|
||||||
*/
|
*/
|
||||||
Collection<ImmutableWorkerInfo> getWorkers();
|
Collection<ImmutableWorkerInfo> getWorkers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a list of workers who can be reaped by autoscaling
|
* Return a list of workers who can be reaped by autoscaling
|
||||||
* @return Workers which can be reaped by autoscaling
|
|
||||||
*/
|
*/
|
||||||
Collection<Worker> getLazyWorkers();
|
Collection<Worker> getLazyWorkers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check which workers can be marked as lazy
|
* Check which workers can be marked as lazy
|
||||||
* @param isLazyWorker
|
|
||||||
* @param maxWorkers
|
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
|
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,10 @@ package io.druid.indexing.overlord.autoscaling;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import io.druid.guice.annotations.ExtensionPoint;
|
||||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
|
import io.druid.indexing.overlord.autoscaling.ec2.EC2AutoScaler;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,6 +34,7 @@ import java.util.List;
|
||||||
@JsonSubTypes(value = {
|
@JsonSubTypes(value = {
|
||||||
@JsonSubTypes.Type(name = "ec2", value = EC2AutoScaler.class)
|
@JsonSubTypes.Type(name = "ec2", value = EC2AutoScaler.class)
|
||||||
})
|
})
|
||||||
|
@ExtensionPoint
|
||||||
public interface AutoScaler<T>
|
public interface AutoScaler<T>
|
||||||
{
|
{
|
||||||
int getMinNumWorkers();
|
int getMinNumWorkers();
|
||||||
|
@ -40,10 +43,13 @@ public interface AutoScaler<T>
|
||||||
|
|
||||||
T getEnvConfig();
|
T getEnvConfig();
|
||||||
|
|
||||||
|
@Nullable
|
||||||
AutoScalingData provision();
|
AutoScalingData provision();
|
||||||
|
|
||||||
|
@Nullable
|
||||||
AutoScalingData terminate(List<String> ips);
|
AutoScalingData terminate(List<String> ips);
|
||||||
|
|
||||||
|
@Nullable
|
||||||
AutoScalingData terminateWithIds(List<String> ids);
|
AutoScalingData terminateWithIds(List<String> ids);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,6 +36,7 @@ import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.WorkerTaskRunner;
|
import io.druid.indexing.overlord.WorkerTaskRunner;
|
||||||
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
|
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
|
@ -43,6 +44,7 @@ import io.druid.java.util.common.DateTimes;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -58,6 +60,34 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
|
|
||||||
private static final String SCHEME = "http";
|
private static final String SCHEME = "http";
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
|
||||||
|
Supplier<WorkerBehaviorConfig> workerConfigRef,
|
||||||
|
String action,
|
||||||
|
EmittingLogger log
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
|
||||||
|
if (workerBehaviorConfig == null) {
|
||||||
|
log.error("No workerConfig available, cannot %s workers.", action);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (!(workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig)) {
|
||||||
|
log.error(
|
||||||
|
"Only DefaultWorkerBehaviorConfig is supported as WorkerBehaviorConfig, [%s] given, cannot %s workers",
|
||||||
|
workerBehaviorConfig,
|
||||||
|
action
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final DefaultWorkerBehaviorConfig workerConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
|
||||||
|
if (workerConfig.getAutoScaler() == null) {
|
||||||
|
log.error("No autoScaler available, cannot %s workers", action);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return workerConfig;
|
||||||
|
}
|
||||||
|
|
||||||
private final PendingTaskBasedWorkerProvisioningConfig config;
|
private final PendingTaskBasedWorkerProvisioningConfig config;
|
||||||
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
|
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
|
||||||
|
|
||||||
|
@ -121,11 +151,12 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
public synchronized boolean doProvision()
|
public synchronized boolean doProvision()
|
||||||
{
|
{
|
||||||
Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
|
Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
|
||||||
|
log.debug("Pending tasks: %d %s", pendingTasks.size(), pendingTasks);
|
||||||
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
||||||
|
log.debug("Workers: %d %s", workers.size(), workers);
|
||||||
boolean didProvision = false;
|
boolean didProvision = false;
|
||||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
|
||||||
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
|
if (workerConfig == null) {
|
||||||
log.error("No workerConfig available, cannot provision new workers.");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,25 +174,33 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
),
|
),
|
||||||
workerConfig
|
workerConfig
|
||||||
);
|
);
|
||||||
|
log.info("Currently provisioning: %d %s", currentlyProvisioning.size(), currentlyProvisioning);
|
||||||
currentlyProvisioning.removeAll(workerNodeIds);
|
currentlyProvisioning.removeAll(workerNodeIds);
|
||||||
|
log.debug(
|
||||||
|
"Currently provisioning without WorkerNodeIds: %d %s",
|
||||||
|
currentlyProvisioning.size(),
|
||||||
|
currentlyProvisioning
|
||||||
|
);
|
||||||
if (currentlyProvisioning.isEmpty()) {
|
if (currentlyProvisioning.isEmpty()) {
|
||||||
int want = getScaleUpNodeCount(
|
int workersToProvision = getScaleUpNodeCount(
|
||||||
runner.getConfig(),
|
runner.getConfig(),
|
||||||
workerConfig,
|
workerConfig,
|
||||||
pendingTasks,
|
pendingTasks,
|
||||||
workers
|
workers
|
||||||
);
|
);
|
||||||
while (want > 0) {
|
log.info("Workers to provision: %d", workersToProvision);
|
||||||
|
while (workersToProvision > 0) {
|
||||||
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
|
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
|
||||||
final List<String> newNodes;
|
final List<String> newNodes;
|
||||||
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
|
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
|
||||||
log.warn("NewNodes is empty, returning from provision loop");
|
log.warn("NewNodes is empty, returning from provision loop");
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
log.info("Provisioned: %d [%s]", provisioned.getNodeIds().size(), provisioned.getNodeIds());
|
||||||
currentlyProvisioning.addAll(newNodes);
|
currentlyProvisioning.addAll(newNodes);
|
||||||
lastProvisionTime = DateTimes.nowUtc();
|
lastProvisionTime = DateTimes.nowUtc();
|
||||||
scalingStats.addProvisionEvent(provisioned);
|
scalingStats.addProvisionEvent(provisioned);
|
||||||
want -= provisioned.getNodeIds().size();
|
workersToProvision -= provisioned.getNodeIds().size();
|
||||||
didProvision = true;
|
didProvision = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,35 +221,38 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
return didProvision;
|
return didProvision;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<String> getWorkerNodeIDs(Collection<Worker> workers, WorkerBehaviorConfig workerConfig)
|
private Collection<String> getWorkerNodeIDs(Collection<Worker> workers, DefaultWorkerBehaviorConfig workerConfig)
|
||||||
{
|
{
|
||||||
List<String> ips = new ArrayList<>(workers.size());
|
List<String> ips = new ArrayList<>(workers.size());
|
||||||
for (Worker worker : workers) {
|
for (Worker worker : workers) {
|
||||||
ips.add(worker.getIp());
|
ips.add(worker.getIp());
|
||||||
}
|
}
|
||||||
return workerConfig.getAutoScaler().ipToIdLookup(ips);
|
List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(ips);
|
||||||
|
log.info("WorkerNodeIds: %d %s", workerNodeIds.size(), workerNodeIds);
|
||||||
|
return workerNodeIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getScaleUpNodeCount(
|
private int getScaleUpNodeCount(
|
||||||
final WorkerTaskRunnerConfig remoteTaskRunnerConfig,
|
final WorkerTaskRunnerConfig remoteTaskRunnerConfig,
|
||||||
final WorkerBehaviorConfig workerConfig,
|
final DefaultWorkerBehaviorConfig workerConfig,
|
||||||
final Collection<Task> pendingTasks,
|
final Collection<Task> pendingTasks,
|
||||||
final Collection<ImmutableWorkerInfo> workers
|
final Collection<ImmutableWorkerInfo> workers
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
|
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
|
||||||
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
|
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
|
||||||
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
|
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
|
||||||
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
|
final int currValidWorkers = getCurrValidWorkers(workers);
|
||||||
|
|
||||||
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since
|
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
|
||||||
// we are not aware of the expectedWorkerCapacity.
|
// since we are not aware of the expectedWorkerCapacity.
|
||||||
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
|
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
|
||||||
remoteTaskRunnerConfig,
|
remoteTaskRunnerConfig,
|
||||||
workerConfig,
|
workerConfig,
|
||||||
pendingTasks,
|
pendingTasks,
|
||||||
workers
|
workers
|
||||||
);
|
);
|
||||||
|
log.debug("More workers needed: %d", moreWorkersNeeded);
|
||||||
|
|
||||||
int want = Math.max(
|
int want = Math.max(
|
||||||
minWorkerCount - currValidWorkers,
|
minWorkerCount - currValidWorkers,
|
||||||
|
@ -218,9 +260,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
Math.min(config.getMaxScalingStep(), moreWorkersNeeded)
|
Math.min(config.getMaxScalingStep(), moreWorkersNeeded)
|
||||||
// Additional workers needed to run current pending tasks
|
// Additional workers needed to run current pending tasks
|
||||||
);
|
);
|
||||||
|
log.info("Want workers: %d", want);
|
||||||
|
|
||||||
if (want > 0 && currValidWorkers >= maxWorkerCount) {
|
if (want > 0 && currValidWorkers >= maxWorkerCount) {
|
||||||
log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", currValidWorkers, maxWorkerCount);
|
log.warn(
|
||||||
|
"Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].",
|
||||||
|
currValidWorkers,
|
||||||
|
maxWorkerCount
|
||||||
|
);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
want = Math.min(want, maxWorkerCount - currValidWorkers);
|
want = Math.min(want, maxWorkerCount - currValidWorkers);
|
||||||
|
@ -229,7 +276,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
|
|
||||||
private int getWorkersNeededToAssignTasks(
|
private int getWorkersNeededToAssignTasks(
|
||||||
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
|
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
|
||||||
final WorkerBehaviorConfig workerConfig,
|
final DefaultWorkerBehaviorConfig workerConfig,
|
||||||
final Collection<Task> pendingTasks,
|
final Collection<Task> pendingTasks,
|
||||||
final Collection<ImmutableWorkerInfo> workers
|
final Collection<ImmutableWorkerInfo> workers
|
||||||
)
|
)
|
||||||
|
@ -238,6 +285,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
workers,
|
workers,
|
||||||
ProvisioningUtil.createValidWorkerPredicate(config)
|
ProvisioningUtil.createValidWorkerPredicate(config)
|
||||||
);
|
);
|
||||||
|
log.debug("Valid workers: %d %s", validWorkers.size(), validWorkers);
|
||||||
|
|
||||||
Map<String, ImmutableWorkerInfo> workersMap = Maps.newHashMap();
|
Map<String, ImmutableWorkerInfo> workersMap = Maps.newHashMap();
|
||||||
for (ImmutableWorkerInfo worker : validWorkers) {
|
for (ImmutableWorkerInfo worker : validWorkers) {
|
||||||
|
@ -246,6 +294,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
|
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
|
||||||
int need = 0;
|
int need = 0;
|
||||||
int capacity = getExpectedWorkerCapacity(workers);
|
int capacity = getExpectedWorkerCapacity(workers);
|
||||||
|
log.info("Expected worker capacity: %d", capacity);
|
||||||
|
|
||||||
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
|
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
|
||||||
// the number of additional workers needed to assign all the pending tasks is noted
|
// the number of additional workers needed to assign all the pending tasks is noted
|
||||||
|
@ -258,6 +307,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
final ImmutableWorkerInfo workerRunningTask;
|
final ImmutableWorkerInfo workerRunningTask;
|
||||||
if (selectedWorker != null) {
|
if (selectedWorker != null) {
|
||||||
workerRunningTask = selectedWorker;
|
workerRunningTask = selectedWorker;
|
||||||
|
log.debug("Worker[%s] able to take the task[%s]", task, workerRunningTask);
|
||||||
} else {
|
} else {
|
||||||
// None of the existing worker can run this task, we need to provision one worker for it.
|
// None of the existing worker can run this task, we need to provision one worker for it.
|
||||||
// create a dummy worker and try to simulate assigning task to it.
|
// create a dummy worker and try to simulate assigning task to it.
|
||||||
|
@ -267,6 +317,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
capacity,
|
capacity,
|
||||||
workerTaskRunnerConfig.getMinWorkerVersion()
|
workerTaskRunnerConfig.getMinWorkerVersion()
|
||||||
);
|
);
|
||||||
|
log.debug("Need more workers, creating a dummy worker[%s]", workerRunningTask);
|
||||||
need++;
|
need++;
|
||||||
}
|
}
|
||||||
// Update map with worker running task
|
// Update map with worker running task
|
||||||
|
@ -279,12 +330,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
public synchronized boolean doTerminate()
|
public synchronized boolean doTerminate()
|
||||||
{
|
{
|
||||||
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
|
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
|
||||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers);
|
||||||
|
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
|
||||||
if (workerConfig == null) {
|
if (workerConfig == null) {
|
||||||
log.warn("No workerConfig available, cannot terminate workers.");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("Currently provisioning: %d %s", currentlyProvisioning.size(), currentlyProvisioning);
|
||||||
if (!currentlyProvisioning.isEmpty()) {
|
if (!currentlyProvisioning.isEmpty()) {
|
||||||
log.debug("Already provisioning nodes, Not Terminating any nodes.");
|
log.debug("Already provisioning nodes, Not Terminating any nodes.");
|
||||||
return false;
|
return false;
|
||||||
|
@ -292,17 +344,17 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
|
|
||||||
boolean didTerminate = false;
|
boolean didTerminate = false;
|
||||||
final Collection<String> workerNodeIds = getWorkerNodeIDs(runner.getLazyWorkers(), workerConfig);
|
final Collection<String> workerNodeIds = getWorkerNodeIDs(runner.getLazyWorkers(), workerConfig);
|
||||||
final Set<String> stillExisting = Sets.newHashSet();
|
log.debug("Currently terminating: %d %s", currentlyTerminating.size(), currentlyTerminating);
|
||||||
for (String s : currentlyTerminating) {
|
currentlyTerminating.retainAll(workerNodeIds);
|
||||||
if (workerNodeIds.contains(s)) {
|
log.debug(
|
||||||
stillExisting.add(s);
|
"Currently terminating among WorkerNodeIds: %d %s",
|
||||||
}
|
currentlyTerminating.size(),
|
||||||
}
|
currentlyTerminating
|
||||||
currentlyTerminating.clear();
|
);
|
||||||
currentlyTerminating.addAll(stillExisting);
|
|
||||||
|
|
||||||
if (currentlyTerminating.isEmpty()) {
|
if (currentlyTerminating.isEmpty()) {
|
||||||
final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig);
|
final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig);
|
||||||
|
log.info("Max workers to terminate: %d", maxWorkersToTerminate);
|
||||||
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
|
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
|
||||||
final Collection<String> laziestWorkerIps =
|
final Collection<String> laziestWorkerIps =
|
||||||
Collections2.transform(
|
Collections2.transform(
|
||||||
|
@ -316,6 +368,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
log.info("Laziest worker ips: %d %s", laziestWorkerIps.size(), laziestWorkerIps);
|
||||||
if (laziestWorkerIps.isEmpty()) {
|
if (laziestWorkerIps.isEmpty()) {
|
||||||
log.debug("Found no lazy workers");
|
log.debug("Found no lazy workers");
|
||||||
} else {
|
} else {
|
||||||
|
@ -328,6 +381,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
final AutoScalingData terminated = workerConfig.getAutoScaler()
|
final AutoScalingData terminated = workerConfig.getAutoScaler()
|
||||||
.terminate(ImmutableList.copyOf(laziestWorkerIps));
|
.terminate(ImmutableList.copyOf(laziestWorkerIps));
|
||||||
if (terminated != null) {
|
if (terminated != null) {
|
||||||
|
log.info("Terminated: %d %s", terminated.getNodeIds().size(), terminated.getNodeIds());
|
||||||
currentlyTerminating.addAll(terminated.getNodeIds());
|
currentlyTerminating.addAll(terminated.getNodeIds());
|
||||||
lastTerminateTime = DateTimes.nowUtc();
|
lastTerminateTime = DateTimes.nowUtc();
|
||||||
scalingStats.addTerminateEvent(terminated);
|
scalingStats.addTerminateEvent(terminated);
|
||||||
|
@ -359,12 +413,12 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, WorkerBehaviorConfig workerConfig)
|
private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, DefaultWorkerBehaviorConfig workerConfig)
|
||||||
{
|
{
|
||||||
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
|
final int currValidWorkers = getCurrValidWorkers(zkWorkers);
|
||||||
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
|
|
||||||
final int invalidWorkers = zkWorkers.size() - currValidWorkers;
|
final int invalidWorkers = zkWorkers.size() - currValidWorkers;
|
||||||
final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
|
final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
|
||||||
|
log.info("Min workers: %d", minWorkers);
|
||||||
|
|
||||||
// Max workers that can be terminated
|
// Max workers that can be terminated
|
||||||
// All invalid workers + any lazy workers above minCapacity
|
// All invalid workers + any lazy workers above minCapacity
|
||||||
|
@ -377,6 +431,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getCurrValidWorkers(Collection<ImmutableWorkerInfo> workers)
|
||||||
|
{
|
||||||
|
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
|
||||||
|
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
|
||||||
|
log.debug("Current valid workers: %d", currValidWorkers);
|
||||||
|
return currValidWorkers;
|
||||||
|
}
|
||||||
|
|
||||||
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
|
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
|
||||||
{
|
{
|
||||||
int size = workers.size();
|
int size = workers.size();
|
||||||
|
|
|
@ -19,7 +19,10 @@
|
||||||
|
|
||||||
package io.druid.indexing.overlord.autoscaling;
|
package io.druid.indexing.overlord.autoscaling;
|
||||||
|
|
||||||
interface Provisioner
|
import io.druid.guice.annotations.PublicApi;
|
||||||
|
|
||||||
|
@PublicApi
|
||||||
|
public interface Provisioner
|
||||||
{
|
{
|
||||||
boolean doTerminate();
|
boolean doTerminate();
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package io.druid.indexing.overlord.autoscaling;
|
package io.druid.indexing.overlord.autoscaling;
|
||||||
|
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,6 +31,7 @@ import java.io.Closeable;
|
||||||
*
|
*
|
||||||
* @see ProvisioningStrategy#makeProvisioningService
|
* @see ProvisioningStrategy#makeProvisioningService
|
||||||
*/
|
*/
|
||||||
|
@PublicApi
|
||||||
public interface ProvisioningService extends Closeable
|
public interface ProvisioningService extends Closeable
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,11 +19,13 @@
|
||||||
|
|
||||||
package io.druid.indexing.overlord.autoscaling;
|
package io.druid.indexing.overlord.autoscaling;
|
||||||
|
|
||||||
|
import io.druid.guice.annotations.ExtensionPoint;
|
||||||
import io.druid.indexing.overlord.TaskRunner;
|
import io.druid.indexing.overlord.TaskRunner;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In general, the resource management is tied to the runner.
|
* In general, the resource management is tied to the runner.
|
||||||
*/
|
*/
|
||||||
|
@ExtensionPoint
|
||||||
public interface ProvisioningStrategy<T extends TaskRunner>
|
public interface ProvisioningStrategy<T extends TaskRunner>
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.MinMaxPriorityQueue;
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
import io.druid.java.util.common.DateTimes;
|
import io.druid.java.util.common.DateTimes;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
@ -33,6 +34,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@PublicApi
|
||||||
public class ScalingStats
|
public class ScalingStats
|
||||||
{
|
{
|
||||||
public enum EVENT
|
public enum EVENT
|
||||||
|
@ -66,6 +68,15 @@ public class ScalingStats
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addAll(ScalingStats stats)
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
synchronized (stats.lock) {
|
||||||
|
recentEvents.addAll(stats.recentEvents);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addProvisionEvent(AutoScalingData data)
|
public void addProvisionEvent(AutoScalingData data)
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
|
|
@ -35,6 +35,7 @@ import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
||||||
import io.druid.indexing.overlord.WorkerTaskRunner;
|
import io.druid.indexing.overlord.WorkerTaskRunner;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import io.druid.java.util.common.DateTimes;
|
import io.druid.java.util.common.DateTimes;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -118,9 +119,9 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
||||||
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
||||||
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
||||||
boolean didProvision = false;
|
boolean didProvision = false;
|
||||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
final DefaultWorkerBehaviorConfig workerConfig =
|
||||||
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
|
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
|
||||||
log.error("No workerConfig available, cannot provision new workers.");
|
if (workerConfig == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,9 +184,9 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
||||||
public synchronized boolean doTerminate()
|
public synchronized boolean doTerminate()
|
||||||
{
|
{
|
||||||
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
||||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
final DefaultWorkerBehaviorConfig workerConfig =
|
||||||
|
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
|
||||||
if (workerConfig == null) {
|
if (workerConfig == null) {
|
||||||
log.warn("No workerConfig available, cannot terminate workers.");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,14 +209,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final Set<String> stillExisting = Sets.newHashSet();
|
currentlyTerminating.retainAll(workerNodeIds);
|
||||||
for (String s : currentlyTerminating) {
|
|
||||||
if (workerNodeIds.contains(s)) {
|
|
||||||
stillExisting.add(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
currentlyTerminating.clear();
|
|
||||||
currentlyTerminating.addAll(stillExisting);
|
|
||||||
|
|
||||||
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
||||||
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
|
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
|
||||||
|
@ -277,7 +271,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
||||||
|
|
||||||
|
|
||||||
private void updateTargetWorkerCount(
|
private void updateTargetWorkerCount(
|
||||||
final WorkerBehaviorConfig workerConfig,
|
final DefaultWorkerBehaviorConfig workerConfig,
|
||||||
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
|
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
|
||||||
final Collection<ImmutableWorkerInfo> zkWorkers
|
final Collection<ImmutableWorkerInfo> zkWorkers
|
||||||
)
|
)
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.google.common.io.ByteSource;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.sun.jersey.spi.container.ResourceFilters;
|
import com.sun.jersey.spi.container.ResourceFilters;
|
||||||
|
import io.druid.audit.AuditEntry;
|
||||||
import io.druid.audit.AuditInfo;
|
import io.druid.audit.AuditInfo;
|
||||||
import io.druid.audit.AuditManager;
|
import io.druid.audit.AuditManager;
|
||||||
import io.druid.common.config.JacksonConfigManager;
|
import io.druid.common.config.JacksonConfigManager;
|
||||||
|
@ -291,14 +292,12 @@ public class OverlordResource
|
||||||
Interval theInterval = interval == null ? null : Intervals.of(interval);
|
Interval theInterval = interval == null ? null : Intervals.of(interval);
|
||||||
if (theInterval == null && count != null) {
|
if (theInterval == null && count != null) {
|
||||||
try {
|
try {
|
||||||
return Response.ok(
|
List<AuditEntry> workerEntryList = auditManager.fetchAuditHistory(
|
||||||
auditManager.fetchAuditHistory(
|
WorkerBehaviorConfig.CONFIG_KEY,
|
||||||
WorkerBehaviorConfig.CONFIG_KEY,
|
WorkerBehaviorConfig.CONFIG_KEY,
|
||||||
WorkerBehaviorConfig.CONFIG_KEY,
|
count
|
||||||
count
|
);
|
||||||
)
|
return Response.ok(workerEntryList).build();
|
||||||
)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
catch (IllegalArgumentException e) {
|
catch (IllegalArgumentException e) {
|
||||||
return Response.status(Response.Status.BAD_REQUEST)
|
return Response.status(Response.Status.BAD_REQUEST)
|
||||||
|
@ -306,14 +305,12 @@ public class OverlordResource
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Response.ok(
|
List<AuditEntry> workerEntryList = auditManager.fetchAuditHistory(
|
||||||
auditManager.fetchAuditHistory(
|
WorkerBehaviorConfig.CONFIG_KEY,
|
||||||
WorkerBehaviorConfig.CONFIG_KEY,
|
WorkerBehaviorConfig.CONFIG_KEY,
|
||||||
WorkerBehaviorConfig.CONFIG_KEY,
|
theInterval
|
||||||
theInterval
|
);
|
||||||
)
|
return Response.ok(workerEntryList).build();
|
||||||
)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@POST
|
@POST
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexing.overlord.setup;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import io.druid.indexing.overlord.autoscaling.AutoScaler;
|
||||||
|
import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class DefaultWorkerBehaviorConfig implements WorkerBehaviorConfig
|
||||||
|
{
|
||||||
|
private static final AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
|
||||||
|
|
||||||
|
public static DefaultWorkerBehaviorConfig defaultConfig()
|
||||||
|
{
|
||||||
|
return new DefaultWorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final WorkerSelectStrategy selectStrategy;
|
||||||
|
private final AutoScaler autoScaler;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public DefaultWorkerBehaviorConfig(
|
||||||
|
@JsonProperty("selectStrategy") WorkerSelectStrategy selectStrategy,
|
||||||
|
@JsonProperty("autoScaler") AutoScaler autoScaler
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.selectStrategy = selectStrategy;
|
||||||
|
this.autoScaler = autoScaler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonProperty
|
||||||
|
public WorkerSelectStrategy getSelectStrategy()
|
||||||
|
{
|
||||||
|
return selectStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public AutoScaler<?> getAutoScaler()
|
||||||
|
{
|
||||||
|
return autoScaler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultWorkerBehaviorConfig that = (DefaultWorkerBehaviorConfig) o;
|
||||||
|
|
||||||
|
if (autoScaler != null ? !autoScaler.equals(that.autoScaler) : that.autoScaler != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (selectStrategy != null ? !selectStrategy.equals(that.selectStrategy) : that.selectStrategy != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = selectStrategy != null ? selectStrategy.hashCode() : 0;
|
||||||
|
result = 31 * result + (autoScaler != null ? autoScaler.hashCode() : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "WorkerConfiguration{" +
|
||||||
|
"selectStrategy=" + selectStrategy +
|
||||||
|
", autoScaler=" + autoScaler +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,85 +19,28 @@
|
||||||
|
|
||||||
package io.druid.indexing.overlord.setup;
|
package io.druid.indexing.overlord.setup;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import io.druid.indexing.overlord.autoscaling.AutoScaler;
|
import io.druid.guice.annotations.ExtensionPoint;
|
||||||
import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Outside of {@link io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy} and
|
||||||
|
* {@link io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy}, WorkerBehaviorConfig is used only
|
||||||
|
* in {@link io.druid.indexing.overlord.TaskRunner}, and only {@link #getSelectStrategy()} method is used. That is why
|
||||||
|
* the WorkerBehaviorConfig's interface is minimized to just this method. PendingTaskBasedWorkerProvisioningStrategy and
|
||||||
|
* SimpleWorkerProvisioningStrategy are written to work only with {@link DefaultWorkerBehaviorConfig}. Extension-defined
|
||||||
|
* WorkerBehaviorConfig implementations should likely be used with different extension-defined {@link
|
||||||
|
* io.druid.indexing.overlord.autoscaling.ProvisioningStrategy} implementations as well.
|
||||||
*/
|
*/
|
||||||
public class WorkerBehaviorConfig
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultWorkerBehaviorConfig.class)
|
||||||
|
@JsonSubTypes(value = {
|
||||||
|
@JsonSubTypes.Type(name = "default", value = DefaultWorkerBehaviorConfig.class)
|
||||||
|
})
|
||||||
|
@ExtensionPoint
|
||||||
|
public interface WorkerBehaviorConfig
|
||||||
{
|
{
|
||||||
public static final String CONFIG_KEY = "worker.config";
|
String CONFIG_KEY = "worker.config";
|
||||||
public static WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy(null);
|
WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy(null);
|
||||||
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
|
|
||||||
|
|
||||||
public static WorkerBehaviorConfig defaultConfig()
|
WorkerSelectStrategy getSelectStrategy();
|
||||||
{
|
|
||||||
return new WorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER);
|
|
||||||
}
|
|
||||||
|
|
||||||
private final WorkerSelectStrategy selectStrategy;
|
|
||||||
private final AutoScaler autoScaler;
|
|
||||||
|
|
||||||
@JsonCreator
|
|
||||||
public WorkerBehaviorConfig(
|
|
||||||
@JsonProperty("selectStrategy") WorkerSelectStrategy selectStrategy,
|
|
||||||
@JsonProperty("autoScaler") AutoScaler autoScaler
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.selectStrategy = selectStrategy;
|
|
||||||
this.autoScaler = autoScaler;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public WorkerSelectStrategy getSelectStrategy()
|
|
||||||
{
|
|
||||||
return selectStrategy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public AutoScaler<?> getAutoScaler()
|
|
||||||
{
|
|
||||||
return autoScaler;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o)
|
|
||||||
{
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
WorkerBehaviorConfig that = (WorkerBehaviorConfig) o;
|
|
||||||
|
|
||||||
if (autoScaler != null ? !autoScaler.equals(that.autoScaler) : that.autoScaler != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (selectStrategy != null ? !selectStrategy.equals(that.selectStrategy) : that.selectStrategy != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode()
|
|
||||||
{
|
|
||||||
int result = selectStrategy != null ? selectStrategy.hashCode() : 0;
|
|
||||||
result = 31 * result + (autoScaler != null ? autoScaler.hashCode() : 0);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString()
|
|
||||||
{
|
|
||||||
return "WorkerConfiguration{" +
|
|
||||||
"selectStrategy=" + selectStrategy +
|
|
||||||
", autoScaler=" + autoScaler +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexing.overlord.setup;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
|
||||||
|
@ -39,6 +40,7 @@ import javax.annotation.Nullable;
|
||||||
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
|
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
|
||||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
|
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
|
||||||
})
|
})
|
||||||
|
@PublicApi
|
||||||
public interface WorkerSelectStrategy
|
public interface WorkerSelectStrategy
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,10 +21,12 @@ package io.druid.indexing.worker;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import io.druid.guice.annotations.PublicApi;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A container for worker metadata.
|
* A container for worker metadata.
|
||||||
*/
|
*/
|
||||||
|
@PublicApi
|
||||||
public class Worker
|
public class Worker
|
||||||
{
|
{
|
||||||
private final String scheme;
|
private final String scheme;
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Supplier;
|
||||||
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
|
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
|
||||||
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
|
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
|
||||||
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
|
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -34,11 +35,11 @@ public class OverlordBlinkLeadershipTest
|
||||||
{
|
{
|
||||||
private RemoteTaskRunnerTestUtils rtrUtils;
|
private RemoteTaskRunnerTestUtils rtrUtils;
|
||||||
private final TestRemoteTaskRunnerConfig remoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(new Period("PT5M"));
|
private final TestRemoteTaskRunnerConfig remoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(new Period("PT5M"));
|
||||||
private final WorkerBehaviorConfig defaultWorkerBehaviourConfig = WorkerBehaviorConfig.defaultConfig();
|
private final DefaultWorkerBehaviorConfig defaultWorkerBehaviourConfig = DefaultWorkerBehaviorConfig.defaultConfig();
|
||||||
private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier = new Supplier<WorkerBehaviorConfig>()
|
private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier = new Supplier<WorkerBehaviorConfig>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public WorkerBehaviorConfig get()
|
public DefaultWorkerBehaviorConfig get()
|
||||||
{
|
{
|
||||||
return defaultWorkerBehaviourConfig;
|
return defaultWorkerBehaviourConfig;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
|
||||||
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
|
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.worker.TaskAnnouncement;
|
import io.druid.indexing.worker.TaskAnnouncement;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import io.druid.java.util.common.StringUtils;
|
import io.druid.java.util.common.StringUtils;
|
||||||
|
@ -130,7 +131,7 @@ public class RemoteTaskRunnerTestUtils
|
||||||
cf,
|
cf,
|
||||||
new PathChildrenCacheFactory.Builder(),
|
new PathChildrenCacheFactory.Builder(),
|
||||||
null,
|
null,
|
||||||
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
|
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
|
||||||
provisioningStrategy
|
provisioningStrategy
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -38,8 +38,9 @@ import io.druid.indexing.overlord.RemoteTaskRunner;
|
||||||
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
||||||
import io.druid.indexing.overlord.ZkWorker;
|
import io.druid.indexing.overlord.ZkWorker;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
|
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
|
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.worker.TaskAnnouncement;
|
import io.druid.indexing.worker.TaskAnnouncement;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
@ -85,7 +86,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
||||||
.setMaxScalingStep(2);
|
.setMaxScalingStep(2);
|
||||||
|
|
||||||
workerConfig = new AtomicReference<>(
|
workerConfig = new AtomicReference<>(
|
||||||
new WorkerBehaviorConfig(
|
new DefaultWorkerBehaviorConfig(
|
||||||
new FillCapacityWorkerSelectStrategy(null),
|
new FillCapacityWorkerSelectStrategy(null),
|
||||||
autoScaler
|
autoScaler
|
||||||
)
|
)
|
||||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.indexing.overlord.RemoteTaskRunner;
|
||||||
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
||||||
import io.druid.indexing.overlord.ZkWorker;
|
import io.druid.indexing.overlord.ZkWorker;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
|
import io.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||||
import io.druid.indexing.worker.TaskAnnouncement;
|
import io.druid.indexing.worker.TaskAnnouncement;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
@ -84,7 +85,7 @@ public class SimpleProvisioningStrategyTest
|
||||||
final ProvisioningSchedulerConfig schedulerConfig = new ProvisioningSchedulerConfig();
|
final ProvisioningSchedulerConfig schedulerConfig = new ProvisioningSchedulerConfig();
|
||||||
|
|
||||||
workerConfig = new AtomicReference<>(
|
workerConfig = new AtomicReference<>(
|
||||||
new WorkerBehaviorConfig(
|
new DefaultWorkerBehaviorConfig(
|
||||||
null,
|
null,
|
||||||
autoScaler
|
autoScaler
|
||||||
)
|
)
|
||||||
|
|
|
@ -410,7 +410,7 @@ public class OverlordTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
|
public Collection<TaskRunnerWorkItem> getPendingTasks()
|
||||||
{
|
{
|
||||||
return ImmutableList.of();
|
return ImmutableList.of();
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class WorkerBehaviorConfigTest
|
||||||
@Test
|
@Test
|
||||||
public void testSerde() throws Exception
|
public void testSerde() throws Exception
|
||||||
{
|
{
|
||||||
WorkerBehaviorConfig config = new WorkerBehaviorConfig(
|
DefaultWorkerBehaviorConfig config = new DefaultWorkerBehaviorConfig(
|
||||||
new FillCapacityWithAffinityWorkerSelectStrategy(
|
new FillCapacityWithAffinityWorkerSelectStrategy(
|
||||||
new AffinityConfig(
|
new AffinityConfig(
|
||||||
ImmutableMap.of("foo", ImmutableSet.of("localhost")),
|
ImmutableMap.of("foo", ImmutableSet.of("localhost")),
|
||||||
|
@ -87,6 +87,6 @@ public class WorkerBehaviorConfigTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), WorkerBehaviorConfig.class));
|
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), DefaultWorkerBehaviorConfig.class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,6 @@ public class CliOverlord extends ServerRunnable
|
||||||
);
|
);
|
||||||
biddy.addBinding("simple").to(SimpleWorkerProvisioningStrategy.class);
|
biddy.addBinding("simple").to(SimpleWorkerProvisioningStrategy.class);
|
||||||
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerProvisioningStrategy.class);
|
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerProvisioningStrategy.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configureOverlordHelpers(Binder binder)
|
private void configureOverlordHelpers(Binder binder)
|
||||||
|
|
Loading…
Reference in New Issue