add pending task based resource management strategy (#2086)

This commit is contained in:
Nishant 2016-04-27 23:10:53 +05:30 committed by Xavier Léauté
parent bf5e5e7b75
commit c29cb7d711
23 changed files with 1437 additions and 205 deletions

View File

@ -382,6 +382,17 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return ImmutableList.copyOf(pendingTasks.values());
}
public Collection<Task> getPendingTaskPayloads()
{
// return a snapshot of current pending task payloads.
return ImmutableList.copyOf(pendingTaskPayloads.values());
}
public RemoteTaskRunnerConfig getConfig()
{
return config;
}
@Override
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
{

View File

@ -30,8 +30,8 @@ import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig;
@ -44,15 +44,14 @@ import java.util.concurrent.ScheduledExecutorService;
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
public static final String TYPE_NAME = "remote";
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final IndexerZkConfig zkPaths;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final SimpleResourceManagementConfig config;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ScheduledExecutorFactory factory;
@Inject
@ -64,8 +63,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ScheduledExecutorFactory factory,
final SimpleResourceManagementConfig config,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ResourceManagementStrategy resourceManagementStrategy
)
{
this.curator = curator;
@ -74,25 +73,14 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.config = config;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.resourceManagementStrategy = resourceManagementStrategy;
this.factory = factory;
}
@Override
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<WorkerTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
factory.create(1, "RemoteTaskRunner-ResourceManagement--%d")
);
} else {
resourceManagementStrategy = new NoopResourceManagementStrategy<>();
}
return new RemoteTaskRunner(
jsonMapper,
remoteTaskRunnerConfig,
@ -105,7 +93,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),
resourceManagementStrategy
resourceManagementSchedulerConfig.isDoAutoscale()
? resourceManagementStrategy
: new NoopResourceManagementStrategy<>()
);
}
}

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord;
import com.google.common.base.Predicate;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import java.util.Collection;
@ -44,4 +46,9 @@ public interface WorkerTaskRunner extends TaskRunner
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
WorkerTaskRunnerConfig getConfig();
Collection<Task> getPendingTaskPayloads();
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.WorkerTaskRunner;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class AbstractWorkerResourceManagementStrategy implements ResourceManagementStrategy<WorkerTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(AbstractWorkerResourceManagementStrategy.class);
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
protected AbstractWorkerResourceManagementStrategy(
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
)
{
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}
@Override
public void startManagement(final WorkerTaskRunner runner)
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
// Any Errors are caught by ScheduledExecutors
doProvision(runner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = resourceManagementSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
resourceManagementSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
// Any Errors are caught by ScheduledExecutors
doTerminate(runner);
}
}
);
started = true;
}
}
abstract boolean doTerminate(WorkerTaskRunner runner);
abstract boolean doProvision(WorkerTaskRunner runner);
@Override
public void stopManagement()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorkerResourceManagementConfig
{
@JsonProperty
private int maxScalingStep = 10;
public int getMaxScalingStep()
{
return maxScalingStep;
}
public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingStep(int maxScalingStep)
{
this.maxScalingStep = maxScalingStep;
return this;
}
public PendingTaskBasedWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
{
super.setWorkerIdleTimeout(workerIdleTimeout);
return this;
}
public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
{
super.setMaxScalingDuration(maxScalingDuration);
return this;
}
public PendingTaskBasedWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
{
super.setNumEventsToTrack(numEventsToTrack);
return this;
}
public PendingTaskBasedWorkerResourceManagementConfig setWorkerVersion(String workerVersion)
{
super.setWorkerVersion(workerVersion);
return this;
}
public PendingTaskBasedWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
{
super.setPendingTaskTimeout(pendingTaskTimeout);
return this;
}
}

View File

@ -0,0 +1,412 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy
{
private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class);
private final PendingTaskBasedWorkerResourceManagementConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
private final Object lock = new Object();
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject
public PendingTaskBasedWorkerResourceManagementStrategy(
PendingTaskBasedWorkerResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorFactory factory
)
{
this(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
factory.create(1, "PendingTaskBasedResourceManagement-manager--%d")
);
}
public PendingTaskBasedWorkerResourceManagementStrategy(
PendingTaskBasedWorkerResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
)
{
super(resourceManagementSchedulerConfig, exec);
this.config = config;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@Override
public boolean doProvision(WorkerTaskRunner runner)
{
Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
log.error("No workerConfig available, cannot provision new workers.");
return false;
}
final Collection<String> workerNodeIds = getWorkerNodeIDs(
Collections2.transform(
workers,
new Function<ImmutableWorkerInfo, Worker>()
{
@Override
public Worker apply(ImmutableWorkerInfo input)
{
return input.getWorker();
}
}
),
workerConfig
);
currentlyProvisioning.removeAll(workerNodeIds);
if (currentlyProvisioning.isEmpty()) {
int want = getScaleUpNodeCount(
runner.getConfig(),
workerConfig,
pendingTasks,
workers
);
while (want > 0) {
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes = provisioned == null ? ImmutableList.<String>of() : provisioned.getNodeIds();
if (newNodes.isEmpty()) {
log.warn("NewNodes is empty, returning from provision loop");
break;
} else {
currentlyProvisioning.addAll(newNodes);
lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned);
want -= provisioned.getNodeIds().size();
didProvision = true;
}
}
} else {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(currentlyProvisioning));
currentlyProvisioning.clear();
}
}
return didProvision;
}
}
private static Collection<String> getWorkerNodeIDs(Collection<Worker> workers, WorkerBehaviorConfig workerConfig)
{
return workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workers,
new Function<Worker, String>()
{
@Override
public String apply(Worker input)
{
return input.getIp();
}
}
)
)
);
}
int getScaleUpNodeCount(
final WorkerTaskRunnerConfig remoteTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since
// we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
workers
);
int want = Math.max(
minWorkerCount - currValidWorkers,
// Additional workers needed to reach minWorkerCount
Math.min(config.getMaxScalingStep(), moreWorkersNeeded)
// Additional workers needed to run current pending tasks
);
if (want > 0 && currValidWorkers >= maxWorkerCount) {
log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].");
return 0;
}
want = Math.min(want, maxWorkerCount - currValidWorkers);
return want;
}
int getWorkersNeededToAssignTasks(
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
workers,
ResourceManagementUtil.createValidWorkerPredicate(config)
);
Map<String, ImmutableWorkerInfo> workersMap = Maps.newHashMap();
for (ImmutableWorkerInfo worker : validWorkers) {
workersMap.put(worker.getWorker().getHost(), worker);
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
int capacity = getExpectedWorkerCapacity(workers);
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
// the number of additional workers needed to assign all the pending tasks is noted
for (Task task : pendingTasks) {
Optional<ImmutableWorkerInfo> selectedWorker = workerSelectStrategy.findWorkerForTask(
workerTaskRunnerConfig,
ImmutableMap.copyOf(workersMap),
task
);
final ImmutableWorkerInfo workerRunningTask;
if (selectedWorker.isPresent()) {
workerRunningTask = selectedWorker.get();
} else {
// None of the existing worker can run this task, we need to provision one worker for it.
// create a dummy worker and try to simulate assigning task to it.
workerRunningTask = createDummyWorker("dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion());
need++;
}
// Update map with worker running task
workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task));
}
return need;
}
@Override
public boolean doTerminate(WorkerTaskRunner runner)
{
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
synchronized (lock) {
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot terminate workers.");
return false;
}
if (!currentlyProvisioning.isEmpty()) {
log.debug("Already provisioning nodes, Not Terminating any nodes.");
return false;
}
boolean didTerminate = false;
final Collection<String> workerNodeIds = getWorkerNodeIDs(runner.getLazyWorkers(), workerConfig);
final Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
if (currentlyTerminating.isEmpty()) {
final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
Lists.newArrayList(
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate),
new Function<Worker, String>()
{
@Override
public String apply(Worker zkWorker)
{
return zkWorker.getIp();
}
}
)
);
if (laziestWorkerIps.isEmpty()) {
log.debug("Found no lazy workers");
} else {
log.info(
"Terminating %,d lazy workers: %s",
laziestWorkerIps.size(),
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addTerminateEvent(terminated);
didTerminate = true;
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
}
return didTerminate;
}
}
private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, WorkerBehaviorConfig workerConfig)
{
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final int invalidWorkers = zkWorkers.size() - currValidWorkers;
final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
// Max workers that can be terminated
// All invalid workers + any lazy workers above minCapacity
return invalidWorkers + Math.max(
0,
Math.min(
config.getMaxScalingStep(),
currValidWorkers - minWorkers
)
);
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
{
int size = workers.size();
if (size == 0) {
// No existing workers assume capacity per worker as 1
return 1;
} else {
// Assume all workers have same capacity
return workers.iterator().next().getWorker().getCapacity();
}
}
private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task)
{
return new ImmutableWorkerInfo(
immutableWorker.getWorker(),
immutableWorker.getCurrCapacityUsed() + 1,
Sets.union(
immutableWorker.getAvailabilityGroups(),
Sets.newHashSet(
task.getTaskResource()
.getAvailabilityGroup()
)
),
Sets.union(
immutableWorker.getRunningTasks(),
Sets.newHashSet(
task.getId()
)
),
DateTime.now()
);
}
private static ImmutableWorkerInfo createDummyWorker(String host, int capacity, String version)
{
return new ImmutableWorkerInfo(
new Worker(host, "-2", capacity, version),
0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.metamx.common.ISE;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.worker.Worker;
public class ResourceManagementUtil
{
public static Predicate<ImmutableWorkerInfo> createValidWorkerPredicate(
final SimpleWorkerResourceManagementConfig config
)
{
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo worker)
{
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return worker.isValidVersion(minVersion);
}
};
}
public static Predicate<ImmutableWorkerInfo> createLazyWorkerPredicate(
final SimpleWorkerResourceManagementConfig config
)
{
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return itHasBeenAWhile || !isValidWorker.apply(worker);
}
};
}
}

View File

@ -24,7 +24,7 @@ import org.joda.time.Period;
/**
*/
public class SimpleResourceManagementConfig
public class SimpleWorkerResourceManagementConfig
{
@JsonProperty
private Period workerIdleTimeout = new Period("PT90m");
@ -49,7 +49,7 @@ public class SimpleResourceManagementConfig
return workerIdleTimeout;
}
public SimpleResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
public SimpleWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
{
this.workerIdleTimeout = workerIdleTimeout;
return this;
@ -60,7 +60,7 @@ public class SimpleResourceManagementConfig
return maxScalingDuration;
}
public SimpleResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
public SimpleWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
{
this.maxScalingDuration = maxScalingDuration;
return this;
@ -71,7 +71,7 @@ public class SimpleResourceManagementConfig
return numEventsToTrack;
}
public SimpleResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
public SimpleWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
{
this.numEventsToTrack = numEventsToTrack;
return this;
@ -82,7 +82,7 @@ public class SimpleResourceManagementConfig
return pendingTaskTimeout;
}
public SimpleResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
public SimpleWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
{
this.pendingTaskTimeout = pendingTaskTimeout;
return this;
@ -93,7 +93,7 @@ public class SimpleResourceManagementConfig
return workerVersion;
}
public SimpleResourceManagementConfig setWorkerVersion(String workerVersion)
public SimpleWorkerResourceManagementConfig setWorkerVersion(String workerVersion)
{
this.workerVersion = workerVersion;
return this;
@ -105,7 +105,7 @@ public class SimpleResourceManagementConfig
return workerPort;
}
public SimpleResourceManagementConfig setWorkerPort(int workerPort)
public SimpleWorkerResourceManagementConfig setWorkerPort(int workerPort)
{
this.workerPort = workerPort;
return this;

View File

@ -29,11 +29,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.WorkerTaskRunner;
@ -41,7 +38,6 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.Collection;
import java.util.List;
@ -50,16 +46,12 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<WorkerTaskRunner>
public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy
{
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private static final EmittingLogger log = new EmittingLogger(SimpleWorkerResourceManagementStrategy.class);
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private volatile boolean started = false;
private final SimpleResourceManagementConfig config;
private final SimpleWorkerResourceManagementConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
@ -72,8 +64,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private DateTime lastTerminateTime = new DateTime();
@Inject
public SimpleResourceManagementStrategy(
SimpleResourceManagementConfig config,
public SimpleWorkerResourceManagementStrategy(
SimpleWorkerResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorFactory factory
@ -87,21 +79,21 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
);
}
public SimpleResourceManagementStrategy(
SimpleResourceManagementConfig config,
public SimpleWorkerResourceManagementStrategy(
SimpleWorkerResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
)
{
super(resourceManagementSchedulerConfig, exec);
this.config = config;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}
boolean doProvision(WorkerTaskRunner runner)
protected boolean doProvision(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
@ -112,7 +104,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
@ -214,16 +207,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<Worker, String>()
{
@Override
public String apply(Worker zkWorker)
public String apply(Worker worker)
{
return zkWorker.getIp();
return worker.getIp();
}
}
);
@ -237,7 +230,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(ImmutableList.copyOf(laziestWorkerIps));
final AutoScalingData terminated = workerConfig.getAutoScaler()
.terminate(ImmutableList.copyOf(laziestWorkerIps));
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
@ -265,111 +259,6 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
@Override
public void startManagement(final WorkerTaskRunner runner)
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doProvision(runner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = resourceManagementSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
resourceManagementSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doTerminate(runner);
}
}
);
started = true;
}
}
@Override
public void stopManagement()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
private static Predicate<ImmutableWorkerInfo> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return itHasBeenAWhile || !isValidWorker.apply(worker);
}
};
}
private static Predicate<ImmutableWorkerInfo> createValidWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo worker)
{
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return worker.isValidVersion(minVersion);
}
};
}
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
@ -380,9 +269,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) {
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config)
ResourceManagementUtil.createValidWorkerPredicate(config)
);
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
@ -469,4 +358,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
return runner.getWorkers();
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}

View File

@ -39,7 +39,7 @@ import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.autoscaling.AutoScalingData;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import java.util.List;
@ -54,7 +54,7 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
private final int maxNumWorkers;
private final EC2EnvironmentConfig envConfig;
private final AmazonEC2 amazonEC2Client;
private final SimpleResourceManagementConfig config;
private final SimpleWorkerResourceManagementConfig config;
@JsonCreator
public EC2AutoScaler(
@ -62,7 +62,7 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("envConfig") EC2EnvironmentConfig envConfig,
@JacksonInject AmazonEC2 amazonEC2Client,
@JacksonInject SimpleResourceManagementConfig config
@JacksonInject SimpleWorkerResourceManagementConfig config
)
{
this.minNumWorkers = minNumWorkers;

View File

@ -28,7 +28,7 @@ import javax.validation.constraints.NotNull;
/**
*/
public class RemoteTaskRunnerConfig
public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
{
@JsonProperty
@NotNull
@ -38,9 +38,6 @@ public class RemoteTaskRunnerConfig
@NotNull
private Period taskCleanupTimeout = new Period("PT15M");
@JsonProperty
private String minWorkerVersion = "0";
@JsonProperty
@Min(10 * 1024)
private int maxZnodeBytes = CuratorUtils.DEFAULT_MAX_ZNODE_BYTES;
@ -62,11 +59,6 @@ public class RemoteTaskRunnerConfig
return taskCleanupTimeout;
}
public String getMinWorkerVersion()
{
return minWorkerVersion;
}
public int getMaxZnodeBytes()
{
return maxZnodeBytes;
@ -107,7 +99,7 @@ public class RemoteTaskRunnerConfig
if (!taskCleanupTimeout.equals(that.taskCleanupTimeout)) {
return false;
}
if (!minWorkerVersion.equals(that.minWorkerVersion)) {
if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) {
return false;
}
return taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout);
@ -119,7 +111,7 @@ public class RemoteTaskRunnerConfig
{
int result = taskAssignmentTimeout.hashCode();
result = 31 * result + taskCleanupTimeout.hashCode();
result = 31 * result + minWorkerVersion.hashCode();
result = 31 * result + getMinWorkerVersion().hashCode();
result = 31 * result + maxZnodeBytes;
result = 31 * result + taskShutdownLinkTimeout.hashCode();
result = 31 * result + pendingTasksRunnerNumThreads;
@ -132,7 +124,7 @@ public class RemoteTaskRunnerConfig
return "RemoteTaskRunnerConfig{" +
"taskAssignmentTimeout=" + taskAssignmentTimeout +
", taskCleanupTimeout=" + taskCleanupTimeout +
", minWorkerVersion='" + minWorkerVersion + '\'' +
", minWorkerVersion='" + getMinWorkerVersion() + '\'' +
", maxZnodeBytes=" + maxZnodeBytes +
", taskShutdownLinkTimeout=" + taskShutdownLinkTimeout +
", pendingTasksRunnerNumThreads=" + pendingTasksRunnerNumThreads +

View File

@ -0,0 +1,33 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.config;
import com.fasterxml.jackson.annotation.JsonProperty;
public class WorkerTaskRunnerConfig
{
@JsonProperty
private String minWorkerVersion = "0";
public String getMinWorkerVersion()
{
return minWorkerVersion;
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.Comparator;
import java.util.TreeSet;
@ -36,7 +37,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
{
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.List;
import java.util.Set;
@ -59,7 +60,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)

View File

@ -26,6 +26,7 @@ import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import java.util.Comparator;
import java.util.TreeSet;
@ -36,7 +37,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.script.Compilable;
import javax.script.Invocable;
@ -39,7 +40,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
{
public static interface SelectorFunction
{
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task);
public String apply(WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task);
}
private final SelectorFunction fnSelector;
@ -62,7 +63,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
String worker = fnSelector.apply(config, zkWorkers, task);

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
/**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
@ -49,7 +50,7 @@ public interface WorkerSelectStrategy
* @return A {@link io.druid.indexing.overlord.ImmutableWorkerInfo} to run the task if one is available.
*/
Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
);

View File

@ -29,10 +29,10 @@ import com.metamx.http.client.HttpClient;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
@ -109,7 +109,7 @@ public class RemoteTaskRunnerFactoryTest
return ScheduledExecutors.fixed(i, s);
}
};
SimpleResourceManagementConfig resourceManagementConfig = new SimpleResourceManagementConfig();
SimpleWorkerResourceManagementConfig resourceManagementConfig = new SimpleWorkerResourceManagementConfig();
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig = new ResourceManagementSchedulerConfig()
{
@Override
@ -126,14 +126,19 @@ public class RemoteTaskRunnerFactoryTest
httpClient,
workerBehaviorConfig,
executorFactory,
resourceManagementConfig,
resourceManagementSchedulerConfig
resourceManagementSchedulerConfig,
new SimpleWorkerResourceManagementStrategy(
resourceManagementConfig,
workerBehaviorConfig,
resourceManagementSchedulerConfig,
executorFactory
)
);
Assert.assertEquals(0, executorCount.get());
Assert.assertEquals(1, executorCount.get());
RemoteTaskRunner remoteTaskRunner1 = factory.build();
Assert.assertEquals(2, executorCount.get());
RemoteTaskRunner remoteTaskRunner2 = factory.build();
Assert.assertEquals(4, executorCount.get());
Assert.assertEquals(3, executorCount.get());
}
}

View File

@ -66,7 +66,7 @@ public class EC2AutoScalerTest
private DescribeInstancesResult describeInstancesResult;
private Reservation reservation;
private Instance instance;
private SimpleResourceManagementConfig managementConfig;
private SimpleWorkerResourceManagementConfig managementConfig;
@Before
public void setUp() throws Exception
@ -81,7 +81,7 @@ public class EC2AutoScalerTest
.withImageId(AMI_ID)
.withPrivateIpAddress(IP);
managementConfig = new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion("");
managementConfig = new SimpleWorkerResourceManagementConfig().setWorkerPort(8080).setWorkerVersion("");
}
@After

View File

@ -0,0 +1,604 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class PendingTaskBasedResourceManagementStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private PendingTaskBasedWorkerResourceManagementConfig config;
private PendingTaskBasedWorkerResourceManagementStrategy strategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
private final static String MIN_VERSION = "2014-01-00T00:01:00Z";
private final static String INVALID_VERSION = "0";
@Before
public void setUp() throws Exception
{
autoScaler = EasyMock.createMock(AutoScaler.class);
testTask = TestTasks.immediateSuccess("task1");
config = new PendingTaskBasedWorkerResourceManagementConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2);
workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig(
new FillCapacityWorkerSelectStrategy(),
autoScaler
)
);
strategy = new PendingTaskBasedWorkerResourceManagementStrategy(
config,
DSuppliers.of(workerConfig),
new ResourceManagementSchedulerConfig(),
executorService
);
}
@Test
public void testSuccessfulInitialMinWorkersProvision() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// No pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Lists.<Task>newArrayList()
);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ImmutableWorkerInfo>asList(
)
);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(3);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 3);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
}
}
@Test
public void testSuccessfulMinWorkersProvision() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// No pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Lists.<Task>newArrayList()
);
// 1 node already running, only provision 2 more.
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable()
)
);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(2);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
}
}
@Test
public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
// No pending tasks
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Lists.<Task>newArrayList()
);
// 1 node already running, only provision 2 more.
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ImmutableWorkerInfo>asList(
new TestZkWorker(testTask).toImmutable(),
new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
)
);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(2);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
}
}
@Test
public void testSomethingProvisioning() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Arrays.<Task>asList(
NoopTask.create()
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ImmutableWorkerInfo>asList(
new TestZkWorker(testTask).toImmutable(),
new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
)
).times(2);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
provisionedSomething = strategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
public void testProvisionAlert() throws Exception
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall();
EasyMock.replay(emitter);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Arrays.<Task>asList(
NoopTask.create()
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(testTask, "hi", "lo", MIN_VERSION, 1).toImmutable(),
new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable(), // Invalid version node
new TestZkWorker(testTask, "h2", "n1", INVALID_VERSION).toImmutable() // Invalid version node
)
).times(2);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.replay(runner);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
Thread.sleep(2000);
provisionedSomething = strategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
EasyMock.verify(autoScaler);
EasyMock.verify(emitter);
EasyMock.verify(runner);
}
@Test
public void testDoSuccessfulTerminate() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList())
);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
new RemoteTaskRunnerWorkItem(
testTask.getId(),
null,
TaskLocation.unknown()
).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Arrays.<Worker>asList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
}
@Test
public void testSomethingTerminating() throws Exception
{
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"))
);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable().getWorker()
)
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
terminatedSomething = strategy.doTerminate(runner);
Assert.assertFalse(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
public void testNoActionNeeded() throws Exception
{
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Arrays.asList(
(Task) NoopTask.create()
)
).times(1);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(NoopTask.create()).toImmutable(),
new TestZkWorker(NoopTask.create()).toImmutable()
)
).times(2);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
public void testMinCountIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Arrays.<Task>asList()
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", MIN_VERSION).toImmutable()
)
).times(3);
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(
runner
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
// Don't provision anything
EasyMock.reset(autoScaler);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(
runner
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.reset(autoScaler);
// Increase minNumWorkers
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
// Should provision two new workers
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScaler);
provisionedSomething = strategy.doProvision(
runner
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
@Test
public void testNullWorkerConfig() throws Exception
{
workerConfig.set(null);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
Arrays.<Task>asList(
NoopTask.create()
)
).times(1);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(null).toImmutable()
)
).times(2);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(
runner
);
boolean provisionedSomething = strategy.doProvision(
runner
);
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
}
private static class TestZkWorker extends ZkWorker
{
private final Task testTask;
public TestZkWorker(
Task testTask
)
{
this(testTask, "host", "ip", MIN_VERSION);
}
public TestZkWorker(
Task testTask,
String host,
String ip,
String version
)
{
this(testTask, host, ip, version, 1);
}
public TestZkWorker(
Task testTask,
String host,
String ip,
String version,
int capacity
)
{
super(new Worker(host, ip, capacity, version), null, new DefaultObjectMapper());
this.testTask = testTask;
}
@Override
public Map<String, TaskAnnouncement> getRunningTasks()
{
if (testTask == null) {
return Maps.newHashMap();
}
return ImmutableMap.of(
testTask.getId(),
TaskAnnouncement.create(
testTask,
TaskStatus.running(testTask.getId()),
TaskLocation.unknown()
)
);
}
}
}

View File

@ -62,7 +62,7 @@ public class SimpleResourceManagementStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private SimpleWorkerResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
@ -72,7 +72,7 @@ public class SimpleResourceManagementStrategyTest
autoScaler = EasyMock.createMock(AutoScaler.class);
testTask = TestTasks.immediateSuccess("task1");
final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig()
final SimpleWorkerResourceManagementConfig simpleWorkerResourceManagementConfig = new SimpleWorkerResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
@ -88,8 +88,8 @@ public class SimpleResourceManagementStrategyTest
)
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
simpleResourceManagementConfig,
simpleResourceManagementStrategy = new SimpleWorkerResourceManagementStrategy(
simpleWorkerResourceManagementConfig,
DSuppliers.of(workerConfig),
schedulerConfig,
executorService

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
@ -39,7 +38,6 @@ import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
@ -69,7 +67,7 @@ public class LoadQueuePeon
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
private final ScheduledExecutorService processingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
@ -92,7 +90,7 @@ public class LoadQueuePeon
CuratorFramework curator,
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService zkWritingExecutor,
ScheduledExecutorService processingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
@ -101,7 +99,7 @@ public class LoadQueuePeon
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.zkWritingExecutor = zkWritingExecutor;
this.processingExecutor = processingExecutor;
this.config = config;
}
@ -202,7 +200,7 @@ public class LoadQueuePeon
return;
}
zkWritingExecutor.execute(
processingExecutor.execute(
new Runnable()
{
@Override
@ -225,7 +223,7 @@ public class LoadQueuePeon
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
zkWritingExecutor.schedule(
processingExecutor.schedule(
new Runnable()
{
@Override

View File

@ -61,10 +61,12 @@ import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource;
@ -144,8 +146,8 @@ public class CliOverlord extends ServerRunnable
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
configureTaskStorage(binder);
configureRunners(binder);
configureAutoscale(binder);
configureRunners(binder);
binder.bind(AuditManager.class)
.toProvider(AuditManagerProvider.class)
@ -207,11 +209,26 @@ public class CliOverlord extends ServerRunnable
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(new TypeLiteral<ResourceManagementStrategy<WorkerTaskRunner>>(){})
.to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(
binder,
"druid.indexer.autoscale",
PendingTaskBasedWorkerResourceManagementConfig.class
);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerResourceManagementConfig.class);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy.type",
Key.get(ResourceManagementStrategy.class),
Key.get(SimpleWorkerResourceManagementStrategy.class)
);
final MapBinder<String, ResourceManagementStrategy> biddy = PolyBind.optionBinder(
binder,
Key.get(ResourceManagementStrategy.class)
);
biddy.addBinding("simple").to(SimpleWorkerResourceManagementStrategy.class);
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
},
new IndexingServiceFirehoseModule(),