Fix RemoteTaskRunner's auto-scaling (#3768)

* Rename ResourceManagementStrategy to ProvisioningStrategy, similarly for related classes. Make ProvisioningService non-global, created per RemoteTaskRunner instead. Add OverlordBlinkLeadershipTest.

* Fix RemoteTaskRunnerFactoryTest.testExecNotSharedBetweenRunners()

* Small fix

* Make SimpleProvisioner and PendingProvisioner more similar in details

* Fix executor name

* Style fixes

* Use LifecycleLock in RemoteTaskRunner
This commit is contained in:
Roman Leventov 2017-07-14 03:11:39 +03:00 committed by Jihoon Son
parent c5c17bb803
commit b7203510b8
25 changed files with 785 additions and 593 deletions

View File

@ -212,7 +212,8 @@ public final class LifecycleLock
/**
* Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called
* before that.
* before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link
* #canStop()} is already called on this LifecycleLock.
*/
public boolean awaitStarted()
{
@ -222,7 +223,8 @@ public final class LifecycleLock
/**
* Awaits until {@link #exitStart()} is called for at most the specified timeout, and returns {@code true} if {@link
* #started()} was called before that. Returns {@code false} if {@code started()} wasn't called before {@code
* exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires.
* exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires, or
* if {@link #canStop()} is already called on this LifecycleLock.
*/
public boolean awaitStarted(long timeout, TimeUnit unit)
{

View File

@ -51,12 +51,14 @@ import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningService;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
@ -172,12 +174,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final Object statusLock = new Object();
private volatile boolean started = false;
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private ProvisioningService provisioningService;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
@ -188,7 +191,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
this.jsonMapper = jsonMapper;
@ -205,7 +208,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
this.provisioningStrategy = provisioningStrategy;
this.runPendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
@ -216,11 +219,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@LifecycleStart
public void start()
{
if (!lifecycleLock.canStart()) {
return;
}
try {
if (started) {
return;
}
final MutableInt waitingFor = new MutableInt(1);
final Object waitingForMonitor = new Object();
@ -327,25 +329,26 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
scheduleBlackListedNodesCleanUp();
resourceManagement.startManagement(this);
started = true;
provisioningService = provisioningStrategy.makeProvisioningService(this);
lifecycleLock.started();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycleLock.exitStart();
}
}
@Override
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
return;
}
try {
if (!started) {
return;
}
started = false;
resourceManagement.stopManagement();
provisioningService.close();
Closer closer = Closer.create();
for (ZkWorker zkWorker : zkWorkers.values()) {
@ -452,7 +455,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.fromNullable(resourceManagement.getStats());
return Optional.fromNullable(provisioningService.getStats());
}
public ZkWorker findWorkerRunningTask(String taskId)
@ -510,8 +513,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Override
public void shutdown(final String taskId)
{
if (!started) {
log.info("This TaskRunner is stopped. Ignoring shutdown command for task: %s", taskId);
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
} else if (pendingTasks.remove(taskId) != null) {
pendingTaskPayloads.remove(taskId);
log.info("Removed task from pending queue: %s", taskId);
@ -693,7 +696,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
*/
private void cleanup(final String taskId)
{
if (!started) {
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
return;
}
final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
@ -1259,7 +1262,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
return getWorkerFromZK(lazyWorkers.values());
}
}
@ -1288,7 +1291,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Override
public Collection<Worker> getLazyWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
return getWorkerFromZK(lazyWorkers.values());
}
private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
@ -1308,18 +1311,20 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
private static ImmutableList<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
return input.getWorker();
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
}
)
);
}

View File

@ -25,9 +25,9 @@ import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -45,8 +45,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ScheduledExecutorFactory factory;
@Inject
@ -58,8 +58,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ScheduledExecutorFactory factory,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ResourceManagementStrategy resourceManagementStrategy
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
)
{
this.curator = curator;
@ -68,8 +68,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.resourceManagementStrategy = resourceManagementStrategy;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.factory = factory;
}
@ -85,9 +85,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),
resourceManagementSchedulerConfig.isDoAutoscale()
? resourceManagementStrategy
: new NoopResourceManagementStrategy<>()
provisioningSchedulerConfig.isDoAutoscale()
? provisioningStrategy
: new NoopProvisioningStrategy<>()
);
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Supplier;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*/
public abstract class AbstractWorkerProvisioningStrategy implements ProvisioningStrategy<WorkerTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(AbstractWorkerProvisioningStrategy.class);
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final Supplier<ScheduledExecutorService> execFactory;
AbstractWorkerProvisioningStrategy(
ProvisioningSchedulerConfig provisioningSchedulerConfig,
Supplier<ScheduledExecutorService> execFactory
)
{
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.execFactory = execFactory;
}
@Override
public ProvisioningService makeProvisioningService(WorkerTaskRunner runner)
{
return new WorkerProvisioningService(makeProvisioner(runner));
}
final class WorkerProvisioningService implements ProvisioningService
{
private final ScheduledExecutorService exec = execFactory.get();
private final Provisioner provisioner;
WorkerProvisioningService(final Provisioner provisioner)
{
log.info("Started Resource Management Scheduler");
this.provisioner = provisioner;
long rate = provisioningSchedulerConfig.getProvisionPeriod().toStandardDuration().getMillis();
exec.scheduleAtFixedRate(
new Runnable()
{
@Override
public void run()
{
try {
provisioner.doProvision();
}
catch (Exception e) {
log.error(e, "Uncaught exception.");
}
}
},
rate,
rate,
TimeUnit.MILLISECONDS
);
// Schedule termination of worker nodes periodically
Period period = provisioningSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
provisioningSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.bucketEnd(new DateTime()).getMillis();
exec.scheduleAtFixedRate(
new Runnable()
{
@Override
public void run()
{
try {
provisioner.doTerminate();
}
catch (Exception e) {
log.error(e, "Uncaught exception.");
}
}
},
new Duration(System.currentTimeMillis(), startTime).getMillis(),
provisioningSchedulerConfig.getTerminatePeriod().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
public ScalingStats getStats()
{
return provisioner.getStats();
}
@Override
public void close()
{
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
}
}
protected abstract Provisioner makeProvisioner(WorkerTaskRunner runner);
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.LifecycleLock;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class AbstractWorkerResourceManagementStrategy implements ResourceManagementStrategy<WorkerTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(AbstractWorkerResourceManagementStrategy.class);
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private final LifecycleLock lifecycleLock = new LifecycleLock();
protected AbstractWorkerResourceManagementStrategy(
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
)
{
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}
@Override
public void startManagement(final WorkerTaskRunner runner)
{
if (!lifecycleLock.canStart()) {
return;
}
try {
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
// Any Errors are caught by ScheduledExecutors
doProvision(runner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = resourceManagementSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
resourceManagementSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.bucketEnd(new DateTime()).getMillis();
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
// Any Errors are caught by ScheduledExecutors
doTerminate(runner);
}
}
);
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
abstract boolean doTerminate(WorkerTaskRunner runner);
abstract boolean doProvision(WorkerTaskRunner runner);
@Override
public void stopManagement()
{
if (!lifecycleLock.canStop()) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.TaskRunner;
public class NoopProvisioningStrategy<T extends TaskRunner> implements ProvisioningStrategy<T>
{
@Override
public ProvisioningService makeProvisioningService(T runner)
{
return new ProvisioningService()
{
@Override
public void close()
{
// nothing to close
}
@Override
public ScalingStats getStats()
{
return null;
}
};
}
}

View File

@ -24,7 +24,7 @@ import org.joda.time.Period;
/**
*/
public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorkerResourceManagementConfig
public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvisioningConfig
{
@JsonProperty
private int maxScalingStep = 10;
@ -35,42 +35,42 @@ public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorker
return maxScalingStep;
}
public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingStep(int maxScalingStep)
public PendingTaskBasedWorkerProvisioningConfig setMaxScalingStep(int maxScalingStep)
{
this.maxScalingStep = maxScalingStep;
return this;
}
@Override
public PendingTaskBasedWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
public PendingTaskBasedWorkerProvisioningConfig setWorkerIdleTimeout(Period workerIdleTimeout)
{
super.setWorkerIdleTimeout(workerIdleTimeout);
return this;
}
@Override
public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
public PendingTaskBasedWorkerProvisioningConfig setMaxScalingDuration(Period maxScalingDuration)
{
super.setMaxScalingDuration(maxScalingDuration);
return this;
}
@Override
public PendingTaskBasedWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
public PendingTaskBasedWorkerProvisioningConfig setNumEventsToTrack(int numEventsToTrack)
{
super.setNumEventsToTrack(numEventsToTrack);
return this;
}
@Override
public PendingTaskBasedWorkerResourceManagementConfig setWorkerVersion(String workerVersion)
public PendingTaskBasedWorkerProvisioningConfig setWorkerVersion(String workerVersion)
{
super.setWorkerVersion(workerVersion);
return this;
}
@Override
public PendingTaskBasedWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pendingTaskTimeout)
{
super.setPendingTaskTimeout(pendingTaskTimeout);
return this;

View File

@ -27,11 +27,11 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
@ -40,11 +40,10 @@ import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -53,58 +52,76 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy
public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerProvisioningStrategy
{
private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class);
private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class);
private static final String SCHEME = "http";
private final PendingTaskBasedWorkerResourceManagementConfig config;
private final PendingTaskBasedWorkerProvisioningConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
private final Object lock = new Object();
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject
public PendingTaskBasedWorkerResourceManagementStrategy(
PendingTaskBasedWorkerResourceManagementConfig config,
public PendingTaskBasedWorkerProvisioningStrategy(
PendingTaskBasedWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorFactory factory
ProvisioningSchedulerConfig provisioningSchedulerConfig
)
{
this(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
factory.create(1, "PendingTaskBasedResourceManagement-manager--%d")
provisioningSchedulerConfig,
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return ScheduledExecutors.fixed(1, "PendingTaskBasedWorkerProvisioning-manager--%d");
}
}
);
}
public PendingTaskBasedWorkerResourceManagementStrategy(
PendingTaskBasedWorkerResourceManagementConfig config,
public PendingTaskBasedWorkerProvisioningStrategy(
PendingTaskBasedWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
ProvisioningSchedulerConfig provisioningSchedulerConfig,
Supplier<ScheduledExecutorService> execFactory
)
{
super(resourceManagementSchedulerConfig, exec);
super(provisioningSchedulerConfig, execFactory);
this.config = config;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@Override
public boolean doProvision(WorkerTaskRunner runner)
public Provisioner makeProvisioner(WorkerTaskRunner runner)
{
Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
synchronized (lock) {
return new PendingProvisioner(runner);
}
private class PendingProvisioner implements Provisioner
{
private final WorkerTaskRunner runner;
private final ScalingStats scalingStats = new ScalingStats(config.getNumEventsToTrack());
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
private PendingProvisioner(WorkerTaskRunner runner)
{
this.runner = runner;
}
@Override
public synchronized boolean doProvision()
{
Collection<Task> pendingTasks = runner.getPendingTaskPayloads();
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
@ -136,8 +153,8 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
);
while (want > 0) {
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes = provisioned == null ? ImmutableList.<String>of() : provisioned.getNodeIds();
if (newNodes.isEmpty()) {
final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
log.warn("NewNodes is empty, returning from provision loop");
break;
} else {
@ -164,111 +181,104 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
return didProvision;
}
}
private static Collection<String> getWorkerNodeIDs(Collection<Worker> workers, WorkerBehaviorConfig workerConfig)
{
return workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workers,
new Function<Worker, String>()
{
@Override
public String apply(Worker input)
{
return input.getIp();
}
}
)
)
);
}
int getScaleUpNodeCount(
final WorkerTaskRunnerConfig remoteTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since
// we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
workers
);
int want = Math.max(
minWorkerCount - currValidWorkers,
// Additional workers needed to reach minWorkerCount
Math.min(config.getMaxScalingStep(), moreWorkersNeeded)
// Additional workers needed to run current pending tasks
);
if (want > 0 && currValidWorkers >= maxWorkerCount) {
log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", currValidWorkers, maxWorkerCount);
return 0;
}
want = Math.min(want, maxWorkerCount - currValidWorkers);
return want;
}
int getWorkersNeededToAssignTasks(
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
workers,
ResourceManagementUtil.createValidWorkerPredicate(config)
);
Map<String, ImmutableWorkerInfo> workersMap = Maps.newHashMap();
for (ImmutableWorkerInfo worker : validWorkers) {
workersMap.put(worker.getWorker().getHost(), worker);
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
int capacity = getExpectedWorkerCapacity(workers);
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
// the number of additional workers needed to assign all the pending tasks is noted
for (Task task : pendingTasks) {
Optional<ImmutableWorkerInfo> selectedWorker = workerSelectStrategy.findWorkerForTask(
workerTaskRunnerConfig,
ImmutableMap.copyOf(workersMap),
task
);
final ImmutableWorkerInfo workerRunningTask;
if (selectedWorker.isPresent()) {
workerRunningTask = selectedWorker.get();
} else {
// None of the existing worker can run this task, we need to provision one worker for it.
// create a dummy worker and try to simulate assigning task to it.
workerRunningTask = createDummyWorker(SCHEME, "dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion());
need++;
private Collection<String> getWorkerNodeIDs(Collection<Worker> workers, WorkerBehaviorConfig workerConfig)
{
List<String> ips = new ArrayList<>(workers.size());
for (Worker worker : workers) {
ips.add(worker.getIp());
}
// Update map with worker running task
workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task));
return workerConfig.getAutoScaler().ipToIdLookup(ips);
}
return need;
}
@Override
public boolean doTerminate(WorkerTaskRunner runner)
{
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
synchronized (lock) {
private int getScaleUpNodeCount(
final WorkerTaskRunnerConfig remoteTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since
// we are not aware of the expectedWorkerCapacity.
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
workers
);
int want = Math.max(
minWorkerCount - currValidWorkers,
// Additional workers needed to reach minWorkerCount
Math.min(config.getMaxScalingStep(), moreWorkersNeeded)
// Additional workers needed to run current pending tasks
);
if (want > 0 && currValidWorkers >= maxWorkerCount) {
log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", currValidWorkers, maxWorkerCount);
return 0;
}
want = Math.min(want, maxWorkerCount - currValidWorkers);
return want;
}
private int getWorkersNeededToAssignTasks(
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
workers,
ProvisioningUtil.createValidWorkerPredicate(config)
);
Map<String, ImmutableWorkerInfo> workersMap = Maps.newHashMap();
for (ImmutableWorkerInfo worker : validWorkers) {
workersMap.put(worker.getWorker().getHost(), worker);
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
int capacity = getExpectedWorkerCapacity(workers);
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
// the number of additional workers needed to assign all the pending tasks is noted
for (Task task : pendingTasks) {
Optional<ImmutableWorkerInfo> selectedWorker = workerSelectStrategy.findWorkerForTask(
workerTaskRunnerConfig,
ImmutableMap.copyOf(workersMap),
task
);
final ImmutableWorkerInfo workerRunningTask;
if (selectedWorker.isPresent()) {
workerRunningTask = selectedWorker.get();
} else {
// None of the existing worker can run this task, we need to provision one worker for it.
// create a dummy worker and try to simulate assigning task to it.
workerRunningTask = createDummyWorker(
SCHEME,
"dummy" + need,
capacity,
workerTaskRunnerConfig.getMinWorkerVersion()
);
need++;
}
// Update map with worker running task
workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task));
}
return need;
}
@Override
public synchronized boolean doTerminate()
{
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot terminate workers.");
@ -293,20 +303,18 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
if (currentlyTerminating.isEmpty()) {
final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
Lists.newArrayList(
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate),
new Function<Worker, String>()
{
@Override
public String apply(Worker zkWorker)
{
return zkWorker.getIp();
}
}
)
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate),
new Function<Worker, String>()
{
@Override
public String apply(Worker zkWorker)
{
return zkWorker.getIp();
}
}
);
if (laziestWorkerIps.isEmpty()) {
log.debug("Found no lazy workers");
@ -317,7 +325,8 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps);
final AutoScalingData terminated = workerConfig.getAutoScaler()
.terminate(ImmutableList.copyOf(laziestWorkerIps));
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
@ -342,11 +351,17 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
return didTerminate;
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}
private int maxWorkersToTerminate(Collection<ImmutableWorkerInfo> zkWorkers, WorkerBehaviorConfig workerConfig)
{
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final int invalidWorkers = zkWorkers.size() - currValidWorkers;
final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers();
@ -362,12 +377,6 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo
);
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
{
int size = workers.size();

View File

@ -19,25 +19,11 @@
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.TaskRunner;
public class NoopResourceManagementStrategy<T extends TaskRunner> implements ResourceManagementStrategy<T>
interface Provisioner
{
@Override
public void startManagement(T runner)
{
boolean doTerminate();
}
boolean doProvision();
@Override
public void stopManagement()
{
}
@Override
public ScalingStats getStats()
{
return null;
}
ScalingStats getStats();
}

View File

@ -25,7 +25,7 @@ import org.joda.time.Period;
/**
*/
public class ResourceManagementSchedulerConfig
public class ProvisioningSchedulerConfig
{
@JsonProperty
private boolean doAutoscale = false;

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import java.io.Closeable;
/**
* The ProvisioningService decides if worker nodes should be provisioned or terminated
* based on the available tasks in the system and the state of the workers in the system.
*
* ProvisioningService is tied to the task runner.
*
* @see ProvisioningStrategy#makeProvisioningService
*/
public interface ProvisioningService extends Closeable
{
/**
* Should be called from TaskRunner's lifecycle stop
*/
@Override
void close();
/**
* Get any interesting stats related to scaling
*
* @return The ScalingStats or `null` if nothing of interest
*/
ScalingStats getStats();
}

View File

@ -22,30 +22,15 @@ package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.TaskRunner;
/**
* The ResourceManagementStrategy decides if worker nodes should be provisioned or determined
* based on the available tasks in the system and the state of the workers in the system.
* In general, the resource management is tied to the runner.
*/
public interface ResourceManagementStrategy<T extends TaskRunner>
public interface ProvisioningStrategy<T extends TaskRunner>
{
/**
* Equivalent to start() but requires a specific runner instance which holds state of interest.
* This method is intended to be called from the TaskRunner's lifecycle
* Creates a new {@link ProvisioningService} for the given {@link TaskRunner}
* This method is intended to be called from the TaskRunner's lifecycle start
*
* @param runner The TaskRunner state holder this strategy should use during execution
*/
void startManagement(T runner);
/**
* Equivalent to stop()
* Should be called from TaskRunner's lifecycle
*/
void stopManagement();
/**
* Get any interesting stats related to scaling
*
* @return The ScalingStats or `null` if nothing of interest
*/
ScalingStats getStats();
ProvisioningService makeProvisioningService(T runner);
}

View File

@ -24,10 +24,10 @@ import com.google.common.base.Predicate;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.java.util.common.ISE;
public class ResourceManagementUtil
public class ProvisioningUtil
{
public static Predicate<ImmutableWorkerInfo> createValidWorkerPredicate(
final SimpleWorkerResourceManagementConfig config
final SimpleWorkerProvisioningConfig config
)
{
return new Predicate<ImmutableWorkerInfo>()
@ -45,7 +45,7 @@ public class ResourceManagementUtil
}
public static Predicate<ImmutableWorkerInfo> createLazyWorkerPredicate(
final SimpleWorkerResourceManagementConfig config
final SimpleWorkerProvisioningConfig config
)
{
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);

View File

@ -24,7 +24,7 @@ import org.joda.time.Period;
/**
*/
public class SimpleWorkerResourceManagementConfig
public class SimpleWorkerProvisioningConfig
{
@JsonProperty
private Period workerIdleTimeout = new Period("PT90m");
@ -49,7 +49,7 @@ public class SimpleWorkerResourceManagementConfig
return workerIdleTimeout;
}
public SimpleWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
public SimpleWorkerProvisioningConfig setWorkerIdleTimeout(Period workerIdleTimeout)
{
this.workerIdleTimeout = workerIdleTimeout;
return this;
@ -60,7 +60,7 @@ public class SimpleWorkerResourceManagementConfig
return maxScalingDuration;
}
public SimpleWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
public SimpleWorkerProvisioningConfig setMaxScalingDuration(Period maxScalingDuration)
{
this.maxScalingDuration = maxScalingDuration;
return this;
@ -71,7 +71,7 @@ public class SimpleWorkerResourceManagementConfig
return numEventsToTrack;
}
public SimpleWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
public SimpleWorkerProvisioningConfig setNumEventsToTrack(int numEventsToTrack)
{
this.numEventsToTrack = numEventsToTrack;
return this;
@ -82,7 +82,7 @@ public class SimpleWorkerResourceManagementConfig
return pendingTaskTimeout;
}
public SimpleWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
public SimpleWorkerProvisioningConfig setPendingTaskTimeout(Period pendingTaskTimeout)
{
this.pendingTaskTimeout = pendingTaskTimeout;
return this;
@ -93,7 +93,7 @@ public class SimpleWorkerResourceManagementConfig
return workerVersion;
}
public SimpleWorkerResourceManagementConfig setWorkerVersion(String workerVersion)
public SimpleWorkerProvisioningConfig setWorkerVersion(String workerVersion)
{
this.workerVersion = workerVersion;
return this;
@ -105,7 +105,7 @@ public class SimpleWorkerResourceManagementConfig
return workerPort;
}
public SimpleWorkerResourceManagementConfig setWorkerPort(int workerPort)
public SimpleWorkerProvisioningConfig setWorkerPort(int workerPort)
{
this.workerPort = workerPort;
return this;

View File

@ -29,14 +29,13 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -48,66 +47,83 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy
public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioningStrategy
{
private static final EmittingLogger log = new EmittingLogger(SimpleWorkerResourceManagementStrategy.class);
private static final EmittingLogger log = new EmittingLogger(SimpleWorkerProvisioningStrategy.class);
private final SimpleWorkerResourceManagementConfig config;
private final SimpleWorkerProvisioningConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
private final Object lock = new Object();
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private int targetWorkerCount = -1;
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject
public SimpleWorkerResourceManagementStrategy(
SimpleWorkerResourceManagementConfig config,
public SimpleWorkerProvisioningStrategy(
SimpleWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorFactory factory
ProvisioningSchedulerConfig provisioningSchedulerConfig
)
{
this(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
factory.create(1, "SimpleResourceManagement-manager--%d")
provisioningSchedulerConfig,
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return ScheduledExecutors.fixed(1, "SimpleResourceManagement-manager--%d");
}
}
);
}
public SimpleWorkerResourceManagementStrategy(
SimpleWorkerResourceManagementConfig config,
public SimpleWorkerProvisioningStrategy(
SimpleWorkerProvisioningConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
ProvisioningSchedulerConfig provisioningSchedulerConfig,
Supplier<ScheduledExecutorService> execFactory
)
{
super(resourceManagementSchedulerConfig, exec);
super(provisioningSchedulerConfig, execFactory);
this.config = config;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@Override
protected boolean doProvision(WorkerTaskRunner runner)
public Provisioner makeProvisioner(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
synchronized (lock) {
return new SimpleProvisioner(runner);
}
private class SimpleProvisioner implements Provisioner
{
private final WorkerTaskRunner runner;
private final ScalingStats scalingStats = new ScalingStats(config.getNumEventsToTrack());
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private int targetWorkerCount = -1;
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
SimpleProvisioner(WorkerTaskRunner runner)
{
this.runner = runner;
}
@Override
public synchronized boolean doProvision()
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
log.warn("No workerConfig available, cannot provision new workers.");
log.error("No workerConfig available, cannot provision new workers.");
return false;
}
final Predicate<ImmutableWorkerInfo> isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
@ -134,6 +150,7 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
final AutoScalingData provisioned = workerConfig.getAutoScaler().provision();
final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
log.warn("NewNodes is empty, returning from provision loop");
break;
} else {
currentlyProvisioning.addAll(newNodes);
@ -146,9 +163,7 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
if (!currentlyProvisioning.isEmpty()) {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
@ -162,13 +177,11 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
return didProvision;
}
}
@Override
boolean doTerminate(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) {
@Override
public synchronized boolean doTerminate()
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot terminate workers.");
@ -203,14 +216,14 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers),
@ -260,21 +273,19 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
return didTerminate;
}
}
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ImmutableWorkerInfo> zkWorkers
)
{
synchronized (lock) {
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ImmutableWorkerInfo> zkWorkers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
zkWorkers,
ResourceManagementUtil.createValidWorkerPredicate(config)
ProvisioningUtil.createValidWorkerPredicate(config)
);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
@ -340,11 +351,9 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
);
}
}
}
private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> pendingTasks)
{
synchronized (lock) {
private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> pendingTasks)
{
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
@ -355,16 +364,12 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour
}
return false;
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}
public Collection<ImmutableWorkerInfo> getWorkers(WorkerTaskRunner runner)
{
return runner.getWorkers();
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}

View File

@ -39,7 +39,7 @@ import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.autoscaling.AutoScalingData;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import java.util.List;
@ -54,7 +54,7 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
private final int maxNumWorkers;
private final EC2EnvironmentConfig envConfig;
private final AmazonEC2 amazonEC2Client;
private final SimpleWorkerResourceManagementConfig config;
private final SimpleWorkerProvisioningConfig config;
@JsonCreator
public EC2AutoScaler(
@ -62,7 +62,7 @@ public class EC2AutoScaler implements AutoScaler<EC2EnvironmentConfig>
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("envConfig") EC2EnvironmentConfig envConfig,
@JacksonInject AmazonEC2 amazonEC2Client,
@JacksonInject SimpleWorkerResourceManagementConfig config
@JacksonInject SimpleWorkerProvisioningConfig config
)
{
this.minNumWorkers = minNumWorkers;

View File

@ -57,7 +57,7 @@ public class WorkerBehaviorConfig
}
@JsonProperty
public AutoScaler getAutoScaler()
public AutoScaler<?> getAutoScaler()
{
return autoScaler;
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Supplier;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class OverlordBlinkLeadershipTest
{
private RemoteTaskRunnerTestUtils rtrUtils;
private final TestRemoteTaskRunnerConfig remoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(new Period("PT5M"));
private final WorkerBehaviorConfig defaultWorkerBehaviourConfig = WorkerBehaviorConfig.defaultConfig();
private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier = new Supplier<WorkerBehaviorConfig>()
{
@Override
public WorkerBehaviorConfig get()
{
return defaultWorkerBehaviourConfig;
}
};
private final SimpleWorkerProvisioningStrategy resourceManagement = new SimpleWorkerProvisioningStrategy(
new SimpleWorkerProvisioningConfig(),
workerBehaviorConfigSupplier,
new ProvisioningSchedulerConfig()
);
@Before
public void setUp() throws Exception
{
rtrUtils = new RemoteTaskRunnerTestUtils();
rtrUtils.setUp();
}
@After
public void tearDown() throws Exception
{
rtrUtils.tearDown();
}
/**
* Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link
* TaskMaster#stopLeading()}), then creating a new taskRunner from {@link
* org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in
* {@link TaskMaster} and start it again.
*/
@Test(timeout = 10_000)
public void testOverlordBlinkLeadership()
{
try {
RemoteTaskRunner remoteTaskRunner1 = rtrUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig, resourceManagement);
remoteTaskRunner1.stop();
RemoteTaskRunner remoteTaskRunner2 = rtrUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig, resourceManagement);
remoteTaskRunner2.stop();
}
catch (Exception e) {
Assert.fail("Should have not thrown any exceptions, thrown: " + e);
}
}
}

View File

@ -26,9 +26,9 @@ import com.google.common.base.Supplier;
import com.metamx.http.client.HttpClient;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -109,8 +109,8 @@ public class RemoteTaskRunnerFactoryTest
return ScheduledExecutors.fixed(i, s);
}
};
SimpleWorkerResourceManagementConfig resourceManagementConfig = new SimpleWorkerResourceManagementConfig();
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig = new ResourceManagementSchedulerConfig()
SimpleWorkerProvisioningConfig provisioningConfig = new SimpleWorkerProvisioningConfig();
ProvisioningSchedulerConfig provisioningSchedulerConfig = new ProvisioningSchedulerConfig()
{
@Override
public boolean isDoAutoscale()
@ -126,19 +126,18 @@ public class RemoteTaskRunnerFactoryTest
httpClient,
workerBehaviorConfig,
executorFactory,
resourceManagementSchedulerConfig,
new SimpleWorkerResourceManagementStrategy(
resourceManagementConfig,
provisioningSchedulerConfig,
new SimpleWorkerProvisioningStrategy(
provisioningConfig,
workerBehaviorConfig,
resourceManagementSchedulerConfig,
executorFactory
provisioningSchedulerConfig
)
);
Assert.assertEquals(1, executorCount.get());
Assert.assertEquals(0, executorCount.get());
RemoteTaskRunner remoteTaskRunner1 = factory.build();
Assert.assertEquals(2, executorCount.get());
Assert.assertEquals(1, executorCount.get());
RemoteTaskRunner remoteTaskRunner2 = factory.build();
Assert.assertEquals(3, executorCount.get());
Assert.assertEquals(2, executorCount.get());
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.PathChildrenCacheFactory;
@ -31,7 +30,8 @@ import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
@ -103,6 +103,15 @@ public class RemoteTaskRunnerTestUtils
}
RemoteTaskRunner makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception
{
NoopProvisioningStrategy<WorkerTaskRunner> resourceManagement = new NoopProvisioningStrategy<>();
return makeRemoteTaskRunner(config, resourceManagement);
}
public RemoteTaskRunner makeRemoteTaskRunner(
RemoteTaskRunnerConfig config,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
@ -122,7 +131,7 @@ public class RemoteTaskRunnerTestUtils
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<WorkerTaskRunner>()
provisioningStrategy
);
remoteTaskRunner.start();

View File

@ -66,7 +66,7 @@ public class EC2AutoScalerTest
private DescribeInstancesResult describeInstancesResult;
private Reservation reservation;
private Instance instance;
private SimpleWorkerResourceManagementConfig managementConfig;
private SimpleWorkerProvisioningConfig managementConfig;
@Before
public void setUp() throws Exception
@ -81,7 +81,7 @@ public class EC2AutoScalerTest
.withImageId(AMI_ID)
.withPrivateIpAddress(IP);
managementConfig = new SimpleWorkerResourceManagementConfig().setWorkerPort(8080).setWorkerVersion("");
managementConfig = new SimpleWorkerProvisioningConfig().setWorkerPort(8080).setWorkerVersion("");
}
@After

View File

@ -20,6 +20,7 @@
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -59,12 +60,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class PendingTaskBasedResourceManagementStrategyTest
public class PendingTaskBasedProvisioningStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private PendingTaskBasedWorkerResourceManagementConfig config;
private PendingTaskBasedWorkerResourceManagementStrategy strategy;
private PendingTaskBasedWorkerProvisioningStrategy strategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
private final static String MIN_VERSION = "2014-01-00T00:01:00Z";
@ -77,7 +77,7 @@ public class PendingTaskBasedResourceManagementStrategyTest
testTask = TestTasks.immediateSuccess("task1");
config = new PendingTaskBasedWorkerResourceManagementConfig()
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
@ -91,11 +91,18 @@ public class PendingTaskBasedResourceManagementStrategyTest
)
);
strategy = new PendingTaskBasedWorkerResourceManagementStrategy(
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ResourceManagementSchedulerConfig(),
executorService
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
}
@ -120,10 +127,11 @@ public class PendingTaskBasedResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(3);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 3);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(provisioner.getStats().toList().size() == 3);
for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
@ -153,10 +161,11 @@ public class PendingTaskBasedResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(2);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(provisioner.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
@ -187,10 +196,11 @@ public class PendingTaskBasedResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList("aNode"))
).times(2);
EasyMock.replay(runner, autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) {
Assert.assertTrue(provisioner.getStats().toList().size() == 2);
for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
Assert.assertTrue(
event.getEvent() == ScalingStats.EVENT.PROVISION
);
@ -223,22 +233,23 @@ public class PendingTaskBasedResourceManagementStrategyTest
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
provisionedSomething = strategy.doProvision(runner);
provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp();
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
@ -282,24 +293,25 @@ public class PendingTaskBasedResourceManagementStrategyTest
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.replay(runner);
boolean provisionedSomething = strategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
Thread.sleep(2000);
provisionedSomething = strategy.doProvision(runner);
provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp();
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
@ -343,12 +355,13 @@ public class PendingTaskBasedResourceManagementStrategyTest
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
@ -380,20 +393,21 @@ public class PendingTaskBasedResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
terminatedSomething = strategy.doTerminate(runner);
terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
Assert.assertTrue(strategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
@ -430,7 +444,8 @@ public class PendingTaskBasedResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -442,7 +457,7 @@ public class PendingTaskBasedResourceManagementStrategyTest
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
@ -476,9 +491,8 @@ public class PendingTaskBasedResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(
runner
);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -489,9 +503,7 @@ public class PendingTaskBasedResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = strategy.doProvision(
runner
);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
@ -509,9 +521,7 @@ public class PendingTaskBasedResourceManagementStrategyTest
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScaler);
provisionedSomething = strategy.doProvision(
runner
);
provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
@ -536,13 +546,10 @@ public class PendingTaskBasedResourceManagementStrategyTest
).times(2);
EasyMock.replay(runner);
boolean terminatedSomething = strategy.doTerminate(
runner
);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
boolean provisionedSomething = strategy.doProvision(
runner
);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);

View File

@ -20,6 +20,7 @@
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -58,11 +59,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class SimpleResourceManagementStrategyTest
public class SimpleProvisioningStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private SimpleWorkerResourceManagementStrategy simpleResourceManagementStrategy;
private SimpleWorkerProvisioningStrategy strategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
@ -72,14 +73,14 @@ public class SimpleResourceManagementStrategyTest
autoScaler = EasyMock.createMock(AutoScaler.class);
testTask = TestTasks.immediateSuccess("task1");
final SimpleWorkerResourceManagementConfig simpleWorkerResourceManagementConfig = new SimpleWorkerResourceManagementConfig()
final SimpleWorkerProvisioningConfig simpleWorkerProvisioningConfig = new SimpleWorkerProvisioningConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion("");
final ResourceManagementSchedulerConfig schedulerConfig = new ResourceManagementSchedulerConfig();
final ProvisioningSchedulerConfig schedulerConfig = new ProvisioningSchedulerConfig();
workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig(
@ -88,11 +89,18 @@ public class SimpleResourceManagementStrategyTest
)
);
simpleResourceManagementStrategy = new SimpleWorkerResourceManagementStrategy(
simpleWorkerResourceManagementConfig,
strategy = new SimpleWorkerProvisioningStrategy(
simpleWorkerProvisioningConfig,
DSuppliers.of(workerConfig),
schedulerConfig,
executorService
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
}
@ -126,12 +134,13 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
EasyMock.verify(autoScaler);
@ -162,22 +171,23 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(runner);
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
@ -218,24 +228,25 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.replay(runner);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
Thread.sleep(2000);
provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
@ -276,12 +287,13 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
@ -319,20 +331,21 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertTrue(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
Assert.assertTrue(provisioner.getStats().toList().size() == 1);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
);
EasyMock.verify(autoScaler);
@ -368,7 +381,8 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -380,7 +394,7 @@ public class SimpleResourceManagementStrategyTest
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
@ -413,9 +427,8 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
runner
);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler);
@ -426,9 +439,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner
);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScaler);
@ -446,9 +457,7 @@ public class SimpleResourceManagementStrategyTest
new AutoScalingData(Lists.newArrayList("h4"))
);
EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner
);
provisionedSomething = provisioner.doProvision();
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScaler);
EasyMock.verify(runner);
@ -473,13 +482,10 @@ public class SimpleResourceManagementStrategyTest
).times(1);
EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
runner
);
Provisioner provisioner = strategy.makeProvisioner(runner);
boolean terminatedSomething = provisioner.doTerminate();
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner
);
boolean provisionedSomething = provisioner.doProvision();
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);

View File

@ -21,7 +21,6 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
@ -38,8 +37,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@ -106,9 +103,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray());
CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes));
log.error(e, "Could not parse json: %s", charBuffer.toString());
log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
throw Throwables.propagate(e);
}
}

View File

@ -60,12 +60,12 @@ import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig;
import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.helpers.OverlordHelper;
import io.druid.indexing.overlord.helpers.TaskLogAutoCleaner;
@ -230,26 +230,26 @@ public class CliOverlord extends ServerRunnable
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ProvisioningSchedulerConfig.class);
JsonConfigProvider.bind(
binder,
"druid.indexer.autoscale",
PendingTaskBasedWorkerResourceManagementConfig.class
PendingTaskBasedWorkerProvisioningConfig.class
);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerResourceManagementConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerProvisioningConfig.class);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy.type",
Key.get(ResourceManagementStrategy.class),
Key.get(SimpleWorkerResourceManagementStrategy.class)
Key.get(ProvisioningStrategy.class),
Key.get(SimpleWorkerProvisioningStrategy.class)
);
final MapBinder<String, ResourceManagementStrategy> biddy = PolyBind.optionBinder(
final MapBinder<String, ProvisioningStrategy> biddy = PolyBind.optionBinder(
binder,
Key.get(ResourceManagementStrategy.class)
Key.get(ProvisioningStrategy.class)
);
biddy.addBinding("simple").to(SimpleWorkerResourceManagementStrategy.class);
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class);
biddy.addBinding("simple").to(SimpleWorkerProvisioningStrategy.class);
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerProvisioningStrategy.class);
}