diff --git a/common/src/main/java/io/druid/concurrent/LifecycleLock.java b/common/src/main/java/io/druid/concurrent/LifecycleLock.java index 7aff1783ec4..e872344bb70 100644 --- a/common/src/main/java/io/druid/concurrent/LifecycleLock.java +++ b/common/src/main/java/io/druid/concurrent/LifecycleLock.java @@ -212,7 +212,8 @@ public final class LifecycleLock /** * Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called - * before that. + * before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link + * #canStop()} is already called on this LifecycleLock. */ public boolean awaitStarted() { @@ -222,7 +223,8 @@ public final class LifecycleLock /** * Awaits until {@link #exitStart()} is called for at most the specified timeout, and returns {@code true} if {@link * #started()} was called before that. Returns {@code false} if {@code started()} wasn't called before {@code - * exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires. + * exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires, or + * if {@link #canStop()} is already called on this LifecycleLock. */ public boolean awaitStarted(long timeout, TimeUnit unit) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 9b582dd1875..4cc533bebfa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -51,12 +51,14 @@ import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; import io.druid.curator.CuratorUtils; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ProvisioningService; +import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; @@ -172,12 +174,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private final Object statusLock = new Object(); - private volatile boolean started = false; + private final LifecycleLock lifecycleLock = new LifecycleLock(); private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); - private final ResourceManagementStrategy resourceManagement; + private final ProvisioningStrategy provisioningStrategy; + private ProvisioningService provisioningService; public RemoteTaskRunner( ObjectMapper jsonMapper, @@ -188,7 +191,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer HttpClient httpClient, Supplier workerConfigRef, ScheduledExecutorService cleanupExec, - ResourceManagementStrategy resourceManagement + ProvisioningStrategy provisioningStrategy ) { this.jsonMapper = jsonMapper; @@ -205,7 +208,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); - this.resourceManagement = resourceManagement; + this.provisioningStrategy = provisioningStrategy; this.runPendingTasksExec = Execs.multiThreaded( config.getPendingTasksRunnerNumThreads(), "rtr-pending-tasks-runner-%d" @@ -216,11 +219,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @LifecycleStart public void start() { + if (!lifecycleLock.canStart()) { + return; + } try { - if (started) { - return; - } - final MutableInt waitingFor = new MutableInt(1); final Object waitingForMonitor = new Object(); @@ -327,25 +329,26 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } scheduleBlackListedNodesCleanUp(); - resourceManagement.startManagement(this); - started = true; + provisioningService = provisioningStrategy.makeProvisioningService(this); + lifecycleLock.started(); } catch (Exception e) { throw Throwables.propagate(e); } + finally { + lifecycleLock.exitStart(); + } } @Override @LifecycleStop public void stop() { + if (!lifecycleLock.canStop()) { + return; + } try { - if (!started) { - return; - } - started = false; - - resourceManagement.stopManagement(); + provisioningService.close(); Closer closer = Closer.create(); for (ZkWorker zkWorker : zkWorkers.values()) { @@ -452,7 +455,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @Override public Optional getScalingStats() { - return Optional.fromNullable(resourceManagement.getStats()); + return Optional.fromNullable(provisioningService.getStats()); } public ZkWorker findWorkerRunningTask(String taskId) @@ -510,8 +513,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @Override public void shutdown(final String taskId) { - if (!started) { - log.info("This TaskRunner is stopped. Ignoring shutdown command for task: %s", taskId); + if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { + log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId); } else if (pendingTasks.remove(taskId) != null) { pendingTaskPayloads.remove(taskId); log.info("Removed task from pending queue: %s", taskId); @@ -693,7 +696,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer */ private void cleanup(final String taskId) { - if (!started) { + if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { return; } final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId); @@ -1259,7 +1262,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer throw Throwables.propagate(e); } } - return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values())); + return getWorkerFromZK(lazyWorkers.values()); } } @@ -1288,7 +1291,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @Override public Collection getLazyWorkers() { - return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values())); + return getWorkerFromZK(lazyWorkers.values()); } private static ImmutableList getImmutableWorkerFromZK(Collection workers) @@ -1308,18 +1311,20 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); } - public static Collection getWorkerFromZK(Collection workers) + private static ImmutableList getWorkerFromZK(Collection workers) { - return Collections2.transform( - workers, - new Function() - { - @Override - public Worker apply(ZkWorker input) + return ImmutableList.copyOf( + Collections2.transform( + workers, + new Function() { - return input.getWorker(); + @Override + public Worker apply(ZkWorker input) + { + return input.getWorker(); + } } - } + ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 66f8152fe82..d0e621ae592 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -25,9 +25,9 @@ import com.google.inject.Inject; import com.metamx.http.client.HttpClient; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.guice.annotations.Global; -import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; +import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -45,8 +45,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory workerConfigRef; - private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; - private final ResourceManagementStrategy resourceManagementStrategy; + private final ProvisioningSchedulerConfig provisioningSchedulerConfig; + private final ProvisioningStrategy provisioningStrategy; private final ScheduledExecutorFactory factory; @Inject @@ -58,8 +58,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory workerConfigRef, final ScheduledExecutorFactory factory, - final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - final ResourceManagementStrategy resourceManagementStrategy + final ProvisioningSchedulerConfig provisioningSchedulerConfig, + final ProvisioningStrategy provisioningStrategy ) { this.curator = curator; @@ -68,8 +68,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory() + provisioningSchedulerConfig.isDoAutoscale() + ? provisioningStrategy + : new NoopProvisioningStrategy<>() ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerProvisioningStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerProvisioningStrategy.java new file mode 100644 index 00000000000..89915853e37 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerProvisioningStrategy.java @@ -0,0 +1,131 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.google.common.base.Supplier; +import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.overlord.WorkerTaskRunner; +import io.druid.java.util.common.granularity.PeriodGranularity; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + */ +public abstract class AbstractWorkerProvisioningStrategy implements ProvisioningStrategy +{ + private static final EmittingLogger log = new EmittingLogger(AbstractWorkerProvisioningStrategy.class); + + private final ProvisioningSchedulerConfig provisioningSchedulerConfig; + private final Supplier execFactory; + + AbstractWorkerProvisioningStrategy( + ProvisioningSchedulerConfig provisioningSchedulerConfig, + Supplier execFactory + ) + { + this.provisioningSchedulerConfig = provisioningSchedulerConfig; + this.execFactory = execFactory; + } + + @Override + public ProvisioningService makeProvisioningService(WorkerTaskRunner runner) + { + return new WorkerProvisioningService(makeProvisioner(runner)); + } + + final class WorkerProvisioningService implements ProvisioningService + { + private final ScheduledExecutorService exec = execFactory.get(); + private final Provisioner provisioner; + + WorkerProvisioningService(final Provisioner provisioner) + { + log.info("Started Resource Management Scheduler"); + this.provisioner = provisioner; + + long rate = provisioningSchedulerConfig.getProvisionPeriod().toStandardDuration().getMillis(); + exec.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + try { + provisioner.doProvision(); + } + catch (Exception e) { + log.error(e, "Uncaught exception."); + } + } + }, + rate, + rate, + TimeUnit.MILLISECONDS + ); + + // Schedule termination of worker nodes periodically + Period period = provisioningSchedulerConfig.getTerminatePeriod(); + PeriodGranularity granularity = new PeriodGranularity( + period, + provisioningSchedulerConfig.getOriginTime(), + null + ); + final long startTime = granularity.bucketEnd(new DateTime()).getMillis(); + + exec.scheduleAtFixedRate( + new Runnable() + { + @Override + public void run() + { + try { + provisioner.doTerminate(); + } + catch (Exception e) { + log.error(e, "Uncaught exception."); + } + } + }, + new Duration(System.currentTimeMillis(), startTime).getMillis(), + provisioningSchedulerConfig.getTerminatePeriod().toStandardDuration().getMillis(), + TimeUnit.MILLISECONDS + ); + } + + @Override + public ScalingStats getStats() + { + return provisioner.getStats(); + } + + @Override + public void close() + { + log.info("Stopping Resource Management Scheduler"); + exec.shutdown(); + } + } + + protected abstract Provisioner makeProvisioner(WorkerTaskRunner runner); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java deleted file mode 100644 index 26898139046..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord.autoscaling; - -import com.metamx.emitter.EmittingLogger; -import io.druid.concurrent.LifecycleLock; -import io.druid.indexing.overlord.WorkerTaskRunner; -import io.druid.java.util.common.granularity.PeriodGranularity; -import io.druid.java.util.common.concurrent.ScheduledExecutors; - -import org.joda.time.DateTime; -import org.joda.time.Duration; -import org.joda.time.Period; - -import java.util.concurrent.ScheduledExecutorService; - -/** - */ -public abstract class AbstractWorkerResourceManagementStrategy implements ResourceManagementStrategy -{ - private static final EmittingLogger log = new EmittingLogger(AbstractWorkerResourceManagementStrategy.class); - - private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; - private final ScheduledExecutorService exec; - private final LifecycleLock lifecycleLock = new LifecycleLock(); - - protected AbstractWorkerResourceManagementStrategy( - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - ScheduledExecutorService exec - ) - { - this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; - this.exec = exec; - } - - @Override - public void startManagement(final WorkerTaskRunner runner) - { - if (!lifecycleLock.canStart()) { - return; - } - try { - - log.info("Started Resource Management Scheduler"); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - // Any Errors are caught by ScheduledExecutors - doProvision(runner); - } - } - ); - - // Schedule termination of worker nodes periodically - Period period = resourceManagementSchedulerConfig.getTerminatePeriod(); - PeriodGranularity granularity = new PeriodGranularity( - period, - resourceManagementSchedulerConfig.getOriginTime(), - null - ); - final long startTime = granularity.bucketEnd(new DateTime()).getMillis(); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - new Duration(System.currentTimeMillis(), startTime), - resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - // Any Errors are caught by ScheduledExecutors - doTerminate(runner); - } - } - ); - lifecycleLock.started(); - } - finally { - lifecycleLock.exitStart(); - } - } - - abstract boolean doTerminate(WorkerTaskRunner runner); - - abstract boolean doProvision(WorkerTaskRunner runner); - - @Override - public void stopManagement() - { - if (!lifecycleLock.canStop()) { - return; - } - log.info("Stopping Resource Management Scheduler"); - exec.shutdown(); - } - -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopProvisioningStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopProvisioningStrategy.java new file mode 100644 index 00000000000..2effc0372cf --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopProvisioningStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import io.druid.indexing.overlord.TaskRunner; + +public class NoopProvisioningStrategy implements ProvisioningStrategy +{ + @Override + public ProvisioningService makeProvisioningService(T runner) + { + return new ProvisioningService() + { + @Override + public void close() + { + // nothing to close + } + + @Override + public ScalingStats getStats() + { + return null; + } + }; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java similarity index 69% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java index 7d120ed5c4d..bf1be2482dc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java @@ -24,7 +24,7 @@ import org.joda.time.Period; /** */ -public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorkerResourceManagementConfig +public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvisioningConfig { @JsonProperty private int maxScalingStep = 10; @@ -35,42 +35,42 @@ public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorker return maxScalingStep; } - public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingStep(int maxScalingStep) + public PendingTaskBasedWorkerProvisioningConfig setMaxScalingStep(int maxScalingStep) { this.maxScalingStep = maxScalingStep; return this; } @Override - public PendingTaskBasedWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) + public PendingTaskBasedWorkerProvisioningConfig setWorkerIdleTimeout(Period workerIdleTimeout) { super.setWorkerIdleTimeout(workerIdleTimeout); return this; } @Override - public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) + public PendingTaskBasedWorkerProvisioningConfig setMaxScalingDuration(Period maxScalingDuration) { super.setMaxScalingDuration(maxScalingDuration); return this; } @Override - public PendingTaskBasedWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) + public PendingTaskBasedWorkerProvisioningConfig setNumEventsToTrack(int numEventsToTrack) { super.setNumEventsToTrack(numEventsToTrack); return this; } @Override - public PendingTaskBasedWorkerResourceManagementConfig setWorkerVersion(String workerVersion) + public PendingTaskBasedWorkerProvisioningConfig setWorkerVersion(String workerVersion) { super.setWorkerVersion(workerVersion); return this; } @Override - public PendingTaskBasedWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) + public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pendingTaskTimeout) { super.setPendingTaskTimeout(pendingTaskTimeout); return this; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java similarity index 57% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 3acb0fe958a..8663e8c7fd9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -27,11 +27,11 @@ import com.google.common.base.Supplier; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; @@ -40,11 +40,10 @@ import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.Worker; -import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; - import org.joda.time.DateTime; import org.joda.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -53,58 +52,76 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy +public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerProvisioningStrategy { - private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class); + private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class); private static final String SCHEME = "http"; - private final PendingTaskBasedWorkerResourceManagementConfig config; + private final PendingTaskBasedWorkerProvisioningConfig config; private final Supplier workerConfigRef; - private final ScalingStats scalingStats; - - private final Object lock = new Object(); - private final Set currentlyProvisioning = Sets.newHashSet(); - private final Set currentlyTerminating = Sets.newHashSet(); - - private DateTime lastProvisionTime = new DateTime(); - private DateTime lastTerminateTime = new DateTime(); @Inject - public PendingTaskBasedWorkerResourceManagementStrategy( - PendingTaskBasedWorkerResourceManagementConfig config, + public PendingTaskBasedWorkerProvisioningStrategy( + PendingTaskBasedWorkerProvisioningConfig config, Supplier workerConfigRef, - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - ScheduledExecutorFactory factory + ProvisioningSchedulerConfig provisioningSchedulerConfig ) { this( config, workerConfigRef, - resourceManagementSchedulerConfig, - factory.create(1, "PendingTaskBasedResourceManagement-manager--%d") + provisioningSchedulerConfig, + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return ScheduledExecutors.fixed(1, "PendingTaskBasedWorkerProvisioning-manager--%d"); + } + } ); } - public PendingTaskBasedWorkerResourceManagementStrategy( - PendingTaskBasedWorkerResourceManagementConfig config, + public PendingTaskBasedWorkerProvisioningStrategy( + PendingTaskBasedWorkerProvisioningConfig config, Supplier workerConfigRef, - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - ScheduledExecutorService exec + ProvisioningSchedulerConfig provisioningSchedulerConfig, + Supplier execFactory ) { - super(resourceManagementSchedulerConfig, exec); + super(provisioningSchedulerConfig, execFactory); this.config = config; this.workerConfigRef = workerConfigRef; - this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); } @Override - public boolean doProvision(WorkerTaskRunner runner) + public Provisioner makeProvisioner(WorkerTaskRunner runner) { - Collection pendingTasks = runner.getPendingTaskPayloads(); - Collection workers = runner.getWorkers(); - synchronized (lock) { + return new PendingProvisioner(runner); + } + + private class PendingProvisioner implements Provisioner + { + private final WorkerTaskRunner runner; + private final ScalingStats scalingStats = new ScalingStats(config.getNumEventsToTrack()); + + private final Set currentlyProvisioning = Sets.newHashSet(); + private final Set currentlyTerminating = Sets.newHashSet(); + + private DateTime lastProvisionTime = new DateTime(); + private DateTime lastTerminateTime = new DateTime(); + + private PendingProvisioner(WorkerTaskRunner runner) + { + this.runner = runner; + } + + @Override + public synchronized boolean doProvision() + { + Collection pendingTasks = runner.getPendingTaskPayloads(); + Collection workers = runner.getWorkers(); boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null || workerConfig.getAutoScaler() == null) { @@ -136,8 +153,8 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo ); while (want > 0) { final AutoScalingData provisioned = workerConfig.getAutoScaler().provision(); - final List newNodes = provisioned == null ? ImmutableList.of() : provisioned.getNodeIds(); - if (newNodes.isEmpty()) { + final List newNodes; + if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) { log.warn("NewNodes is empty, returning from provision loop"); break; } else { @@ -164,111 +181,104 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo return didProvision; } - } - private static Collection getWorkerNodeIDs(Collection workers, WorkerBehaviorConfig workerConfig) - { - return workerConfig.getAutoScaler().ipToIdLookup( - Lists.newArrayList( - Iterables.transform( - workers, - new Function() - { - @Override - public String apply(Worker input) - { - return input.getIp(); - } - } - ) - ) - ); - } - - int getScaleUpNodeCount( - final WorkerTaskRunnerConfig remoteTaskRunnerConfig, - final WorkerBehaviorConfig workerConfig, - final Collection pendingTasks, - final Collection workers - ) - { - final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); - final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); - final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); - final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); - - // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since - // we are not aware of the expectedWorkerCapacity. - int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks( - remoteTaskRunnerConfig, - workerConfig, - pendingTasks, - workers - ); - - int want = Math.max( - minWorkerCount - currValidWorkers, - // Additional workers needed to reach minWorkerCount - Math.min(config.getMaxScalingStep(), moreWorkersNeeded) - // Additional workers needed to run current pending tasks - ); - - if (want > 0 && currValidWorkers >= maxWorkerCount) { - log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", currValidWorkers, maxWorkerCount); - return 0; - } - want = Math.min(want, maxWorkerCount - currValidWorkers); - return want; - } - - int getWorkersNeededToAssignTasks( - final WorkerTaskRunnerConfig workerTaskRunnerConfig, - final WorkerBehaviorConfig workerConfig, - final Collection pendingTasks, - final Collection workers - ) - { - final Collection validWorkers = Collections2.filter( - workers, - ResourceManagementUtil.createValidWorkerPredicate(config) - ); - - Map workersMap = Maps.newHashMap(); - for (ImmutableWorkerInfo worker : validWorkers) { - workersMap.put(worker.getWorker().getHost(), worker); - } - WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); - int need = 0; - int capacity = getExpectedWorkerCapacity(workers); - - // Simulate assigning tasks to dummy workers using configured workerSelectStrategy - // the number of additional workers needed to assign all the pending tasks is noted - for (Task task : pendingTasks) { - Optional selectedWorker = workerSelectStrategy.findWorkerForTask( - workerTaskRunnerConfig, - ImmutableMap.copyOf(workersMap), - task - ); - final ImmutableWorkerInfo workerRunningTask; - if (selectedWorker.isPresent()) { - workerRunningTask = selectedWorker.get(); - } else { - // None of the existing worker can run this task, we need to provision one worker for it. - // create a dummy worker and try to simulate assigning task to it. - workerRunningTask = createDummyWorker(SCHEME, "dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion()); - need++; + private Collection getWorkerNodeIDs(Collection workers, WorkerBehaviorConfig workerConfig) + { + List ips = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + ips.add(worker.getIp()); } - // Update map with worker running task - workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task)); + return workerConfig.getAutoScaler().ipToIdLookup(ips); } - return need; - } - @Override - public boolean doTerminate(WorkerTaskRunner runner) - { - Collection zkWorkers = runner.getWorkers(); - synchronized (lock) { + private int getScaleUpNodeCount( + final WorkerTaskRunnerConfig remoteTaskRunnerConfig, + final WorkerBehaviorConfig workerConfig, + final Collection pendingTasks, + final Collection workers + ) + { + final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); + final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); + final Predicate isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config); + final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); + + // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since + // we are not aware of the expectedWorkerCapacity. + int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks( + remoteTaskRunnerConfig, + workerConfig, + pendingTasks, + workers + ); + + int want = Math.max( + minWorkerCount - currValidWorkers, + // Additional workers needed to reach minWorkerCount + Math.min(config.getMaxScalingStep(), moreWorkersNeeded) + // Additional workers needed to run current pending tasks + ); + + if (want > 0 && currValidWorkers >= maxWorkerCount) { + log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d].", currValidWorkers, maxWorkerCount); + return 0; + } + want = Math.min(want, maxWorkerCount - currValidWorkers); + return want; + } + + private int getWorkersNeededToAssignTasks( + final WorkerTaskRunnerConfig workerTaskRunnerConfig, + final WorkerBehaviorConfig workerConfig, + final Collection pendingTasks, + final Collection workers + ) + { + final Collection validWorkers = Collections2.filter( + workers, + ProvisioningUtil.createValidWorkerPredicate(config) + ); + + Map workersMap = Maps.newHashMap(); + for (ImmutableWorkerInfo worker : validWorkers) { + workersMap.put(worker.getWorker().getHost(), worker); + } + WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); + int need = 0; + int capacity = getExpectedWorkerCapacity(workers); + + // Simulate assigning tasks to dummy workers using configured workerSelectStrategy + // the number of additional workers needed to assign all the pending tasks is noted + for (Task task : pendingTasks) { + Optional selectedWorker = workerSelectStrategy.findWorkerForTask( + workerTaskRunnerConfig, + ImmutableMap.copyOf(workersMap), + task + ); + final ImmutableWorkerInfo workerRunningTask; + if (selectedWorker.isPresent()) { + workerRunningTask = selectedWorker.get(); + } else { + // None of the existing worker can run this task, we need to provision one worker for it. + // create a dummy worker and try to simulate assigning task to it. + workerRunningTask = createDummyWorker( + SCHEME, + "dummy" + need, + capacity, + workerTaskRunnerConfig.getMinWorkerVersion() + ); + need++; + } + // Update map with worker running task + workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task)); + } + return need; + } + + @Override + public synchronized boolean doTerminate() + { + Collection zkWorkers = runner.getWorkers(); final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null) { log.warn("No workerConfig available, cannot terminate workers."); @@ -293,20 +303,18 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo if (currentlyTerminating.isEmpty()) { final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig); - final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); - final List laziestWorkerIps = - Lists.newArrayList( - Collections2.transform( - runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate), - new Function() - { - @Override - public String apply(Worker zkWorker) - { - return zkWorker.getIp(); - } - } - ) + final Predicate isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config); + final Collection laziestWorkerIps = + Collections2.transform( + runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate), + new Function() + { + @Override + public String apply(Worker zkWorker) + { + return zkWorker.getIp(); + } + } ); if (laziestWorkerIps.isEmpty()) { log.debug("Found no lazy workers"); @@ -317,7 +325,8 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo Joiner.on(", ").join(laziestWorkerIps) ); - final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps); + final AutoScalingData terminated = workerConfig.getAutoScaler() + .terminate(ImmutableList.copyOf(laziestWorkerIps)); if (terminated != null) { currentlyTerminating.addAll(terminated.getNodeIds()); lastTerminateTime = new DateTime(); @@ -342,11 +351,17 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo return didTerminate; } + + @Override + public ScalingStats getStats() + { + return scalingStats; + } } private int maxWorkersToTerminate(Collection zkWorkers, WorkerBehaviorConfig workerConfig) { - final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); + final Predicate isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config); final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); final int invalidWorkers = zkWorkers.size() - currValidWorkers; final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers(); @@ -362,12 +377,6 @@ public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWo ); } - @Override - public ScalingStats getStats() - { - return scalingStats; - } - private static int getExpectedWorkerCapacity(final Collection workers) { int size = workers.size(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/Provisioner.java similarity index 71% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/Provisioner.java index 3cd19f966ff..fea7cd8f242 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/Provisioner.java @@ -19,25 +19,11 @@ package io.druid.indexing.overlord.autoscaling; -import io.druid.indexing.overlord.TaskRunner; - -public class NoopResourceManagementStrategy implements ResourceManagementStrategy +interface Provisioner { - @Override - public void startManagement(T runner) - { + boolean doTerminate(); - } + boolean doProvision(); - @Override - public void stopManagement() - { - - } - - @Override - public ScalingStats getStats() - { - return null; - } + ScalingStats getStats(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningSchedulerConfig.java similarity index 97% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerConfig.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningSchedulerConfig.java index e092b1cdfc1..4c7c9b03692 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningSchedulerConfig.java @@ -25,7 +25,7 @@ import org.joda.time.Period; /** */ -public class ResourceManagementSchedulerConfig +public class ProvisioningSchedulerConfig { @JsonProperty private boolean doAutoscale = false; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningService.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningService.java new file mode 100644 index 00000000000..60d53e2b026 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningService.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import java.io.Closeable; + +/** + * The ProvisioningService decides if worker nodes should be provisioned or terminated + * based on the available tasks in the system and the state of the workers in the system. + * + * ProvisioningService is tied to the task runner. + * + * @see ProvisioningStrategy#makeProvisioningService + */ +public interface ProvisioningService extends Closeable +{ + /** + * Should be called from TaskRunner's lifecycle stop + */ + @Override + void close(); + + /** + * Get any interesting stats related to scaling + * + * @return The ScalingStats or `null` if nothing of interest + */ + ScalingStats getStats(); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java similarity index 63% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java index bdf5dc42739..ff5c7ae26f2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java @@ -22,30 +22,15 @@ package io.druid.indexing.overlord.autoscaling; import io.druid.indexing.overlord.TaskRunner; /** - * The ResourceManagementStrategy decides if worker nodes should be provisioned or determined - * based on the available tasks in the system and the state of the workers in the system. * In general, the resource management is tied to the runner. */ -public interface ResourceManagementStrategy +public interface ProvisioningStrategy { /** - * Equivalent to start() but requires a specific runner instance which holds state of interest. - * This method is intended to be called from the TaskRunner's lifecycle + * Creates a new {@link ProvisioningService} for the given {@link TaskRunner} + * This method is intended to be called from the TaskRunner's lifecycle start * * @param runner The TaskRunner state holder this strategy should use during execution */ - void startManagement(T runner); - - /** - * Equivalent to stop() - * Should be called from TaskRunner's lifecycle - */ - void stopManagement(); - - /** - * Get any interesting stats related to scaling - * - * @return The ScalingStats or `null` if nothing of interest - */ - ScalingStats getStats(); + ProvisioningService makeProvisioningService(T runner); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningUtil.java similarity index 93% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningUtil.java index 0659ebff2de..c8405557a6b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ProvisioningUtil.java @@ -24,10 +24,10 @@ import com.google.common.base.Predicate; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.java.util.common.ISE; -public class ResourceManagementUtil +public class ProvisioningUtil { public static Predicate createValidWorkerPredicate( - final SimpleWorkerResourceManagementConfig config + final SimpleWorkerProvisioningConfig config ) { return new Predicate() @@ -45,7 +45,7 @@ public class ResourceManagementUtil } public static Predicate createLazyWorkerPredicate( - final SimpleWorkerResourceManagementConfig config + final SimpleWorkerProvisioningConfig config ) { final Predicate isValidWorker = createValidWorkerPredicate(config); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningConfig.java similarity index 79% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningConfig.java index 47ed29c85e9..624d9882a01 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningConfig.java @@ -24,7 +24,7 @@ import org.joda.time.Period; /** */ -public class SimpleWorkerResourceManagementConfig +public class SimpleWorkerProvisioningConfig { @JsonProperty private Period workerIdleTimeout = new Period("PT90m"); @@ -49,7 +49,7 @@ public class SimpleWorkerResourceManagementConfig return workerIdleTimeout; } - public SimpleWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) + public SimpleWorkerProvisioningConfig setWorkerIdleTimeout(Period workerIdleTimeout) { this.workerIdleTimeout = workerIdleTimeout; return this; @@ -60,7 +60,7 @@ public class SimpleWorkerResourceManagementConfig return maxScalingDuration; } - public SimpleWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) + public SimpleWorkerProvisioningConfig setMaxScalingDuration(Period maxScalingDuration) { this.maxScalingDuration = maxScalingDuration; return this; @@ -71,7 +71,7 @@ public class SimpleWorkerResourceManagementConfig return numEventsToTrack; } - public SimpleWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) + public SimpleWorkerProvisioningConfig setNumEventsToTrack(int numEventsToTrack) { this.numEventsToTrack = numEventsToTrack; return this; @@ -82,7 +82,7 @@ public class SimpleWorkerResourceManagementConfig return pendingTaskTimeout; } - public SimpleWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) + public SimpleWorkerProvisioningConfig setPendingTaskTimeout(Period pendingTaskTimeout) { this.pendingTaskTimeout = pendingTaskTimeout; return this; @@ -93,7 +93,7 @@ public class SimpleWorkerResourceManagementConfig return workerVersion; } - public SimpleWorkerResourceManagementConfig setWorkerVersion(String workerVersion) + public SimpleWorkerProvisioningConfig setWorkerVersion(String workerVersion) { this.workerVersion = workerVersion; return this; @@ -105,7 +105,7 @@ public class SimpleWorkerResourceManagementConfig return workerPort; } - public SimpleWorkerResourceManagementConfig setWorkerPort(int workerPort) + public SimpleWorkerProvisioningConfig setWorkerPort(int workerPort) { this.workerPort = workerPort; return this; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java similarity index 78% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java index 4907ac0baaa..9c2da6e1b37 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java @@ -29,14 +29,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.WorkerTaskRunner; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.Worker; -import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; - import org.joda.time.DateTime; import org.joda.time.Duration; @@ -48,66 +47,83 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy +public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioningStrategy { - private static final EmittingLogger log = new EmittingLogger(SimpleWorkerResourceManagementStrategy.class); + private static final EmittingLogger log = new EmittingLogger(SimpleWorkerProvisioningStrategy.class); - private final SimpleWorkerResourceManagementConfig config; + private final SimpleWorkerProvisioningConfig config; private final Supplier workerConfigRef; - private final ScalingStats scalingStats; - - private final Object lock = new Object(); - private final Set currentlyProvisioning = Sets.newHashSet(); - private final Set currentlyTerminating = Sets.newHashSet(); - - private int targetWorkerCount = -1; - private DateTime lastProvisionTime = new DateTime(); - private DateTime lastTerminateTime = new DateTime(); @Inject - public SimpleWorkerResourceManagementStrategy( - SimpleWorkerResourceManagementConfig config, + public SimpleWorkerProvisioningStrategy( + SimpleWorkerProvisioningConfig config, Supplier workerConfigRef, - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - ScheduledExecutorFactory factory + ProvisioningSchedulerConfig provisioningSchedulerConfig ) { this( config, workerConfigRef, - resourceManagementSchedulerConfig, - factory.create(1, "SimpleResourceManagement-manager--%d") + provisioningSchedulerConfig, + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return ScheduledExecutors.fixed(1, "SimpleResourceManagement-manager--%d"); + } + } ); } - public SimpleWorkerResourceManagementStrategy( - SimpleWorkerResourceManagementConfig config, + public SimpleWorkerProvisioningStrategy( + SimpleWorkerProvisioningConfig config, Supplier workerConfigRef, - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, - ScheduledExecutorService exec + ProvisioningSchedulerConfig provisioningSchedulerConfig, + Supplier execFactory ) { - super(resourceManagementSchedulerConfig, exec); + super(provisioningSchedulerConfig, execFactory); this.config = config; this.workerConfigRef = workerConfigRef; - this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); } - @Override - protected boolean doProvision(WorkerTaskRunner runner) + public Provisioner makeProvisioner(WorkerTaskRunner runner) { - Collection pendingTasks = runner.getPendingTasks(); - Collection workers = getWorkers(runner); - synchronized (lock) { + return new SimpleProvisioner(runner); + } + + private class SimpleProvisioner implements Provisioner + { + private final WorkerTaskRunner runner; + private final ScalingStats scalingStats = new ScalingStats(config.getNumEventsToTrack()); + + private final Set currentlyProvisioning = Sets.newHashSet(); + private final Set currentlyTerminating = Sets.newHashSet(); + + private int targetWorkerCount = -1; + private DateTime lastProvisionTime = new DateTime(); + private DateTime lastTerminateTime = new DateTime(); + + SimpleProvisioner(WorkerTaskRunner runner) + { + this.runner = runner; + } + + @Override + public synchronized boolean doProvision() + { + Collection pendingTasks = runner.getPendingTasks(); + Collection workers = runner.getWorkers(); boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null || workerConfig.getAutoScaler() == null) { - log.warn("No workerConfig available, cannot provision new workers."); + log.error("No workerConfig available, cannot provision new workers."); return false; } - final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); + final Predicate isValidWorker = ProvisioningUtil.createValidWorkerPredicate(config); final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); final List workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup( @@ -134,6 +150,7 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour final AutoScalingData provisioned = workerConfig.getAutoScaler().provision(); final List newNodes; if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) { + log.warn("NewNodes is empty, returning from provision loop"); break; } else { currentlyProvisioning.addAll(newNodes); @@ -146,9 +163,7 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour if (!currentlyProvisioning.isEmpty()) { Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); - log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision); - if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { log.makeAlert("Worker node provisioning taking too long!") .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) @@ -162,13 +177,11 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour return didProvision; } - } - @Override - boolean doTerminate(WorkerTaskRunner runner) - { - Collection pendingTasks = runner.getPendingTasks(); - synchronized (lock) { + @Override + public synchronized boolean doTerminate() + { + Collection pendingTasks = runner.getPendingTasks(); final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null) { log.warn("No workerConfig available, cannot terminate workers."); @@ -203,14 +216,14 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour currentlyTerminating.clear(); currentlyTerminating.addAll(stillExisting); - Collection workers = getWorkers(runner); + Collection workers = runner.getWorkers(); updateTargetWorkerCount(workerConfig, pendingTasks, workers); if (currentlyTerminating.isEmpty()) { final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; if (excessWorkers > 0) { - final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); + final Predicate isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config); final Collection laziestWorkerIps = Collections2.transform( runner.markWorkersLazy(isLazyWorker, excessWorkers), @@ -260,21 +273,19 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour return didTerminate; } - } - private void updateTargetWorkerCount( - final WorkerBehaviorConfig workerConfig, - final Collection pendingTasks, - final Collection zkWorkers - ) - { - synchronized (lock) { + private void updateTargetWorkerCount( + final WorkerBehaviorConfig workerConfig, + final Collection pendingTasks, + final Collection zkWorkers + ) + { final Collection validWorkers = Collections2.filter( zkWorkers, - ResourceManagementUtil.createValidWorkerPredicate(config) + ProvisioningUtil.createValidWorkerPredicate(config) ); - final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); + final Predicate isLazyWorker = ProvisioningUtil.createLazyWorkerPredicate(config); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); @@ -340,11 +351,9 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour ); } } - } - private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) - { - synchronized (lock) { + private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) + { long now = System.currentTimeMillis(); for (TaskRunnerWorkItem pendingTask : pendingTasks) { final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now); @@ -355,16 +364,12 @@ public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResour } return false; } + + @Override + public ScalingStats getStats() + { + return scalingStats; + } } - public Collection getWorkers(WorkerTaskRunner runner) - { - return runner.getWorkers(); - } - - @Override - public ScalingStats getStats() - { - return scalingStats; - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java index f14219013fa..60ef3f24734 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java @@ -39,7 +39,7 @@ import com.google.common.collect.Lists; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.overlord.autoscaling.AutoScaler; import io.druid.indexing.overlord.autoscaling.AutoScalingData; -import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig; import java.util.List; @@ -54,7 +54,7 @@ public class EC2AutoScaler implements AutoScaler private final int maxNumWorkers; private final EC2EnvironmentConfig envConfig; private final AmazonEC2 amazonEC2Client; - private final SimpleWorkerResourceManagementConfig config; + private final SimpleWorkerProvisioningConfig config; @JsonCreator public EC2AutoScaler( @@ -62,7 +62,7 @@ public class EC2AutoScaler implements AutoScaler @JsonProperty("maxNumWorkers") int maxNumWorkers, @JsonProperty("envConfig") EC2EnvironmentConfig envConfig, @JacksonInject AmazonEC2 amazonEC2Client, - @JacksonInject SimpleWorkerResourceManagementConfig config + @JacksonInject SimpleWorkerProvisioningConfig config ) { this.minNumWorkers = minNumWorkers; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java index 8e3312484a2..57e571659d5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java @@ -57,7 +57,7 @@ public class WorkerBehaviorConfig } @JsonProperty - public AutoScaler getAutoScaler() + public AutoScaler getAutoScaler() { return autoScaler; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordBlinkLeadershipTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordBlinkLeadershipTest.java new file mode 100644 index 00000000000..a529c0c85b0 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/OverlordBlinkLeadershipTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.google.common.base.Supplier; +import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class OverlordBlinkLeadershipTest +{ + private RemoteTaskRunnerTestUtils rtrUtils; + private final TestRemoteTaskRunnerConfig remoteTaskRunnerConfig = new TestRemoteTaskRunnerConfig(new Period("PT5M")); + private final WorkerBehaviorConfig defaultWorkerBehaviourConfig = WorkerBehaviorConfig.defaultConfig(); + private final Supplier workerBehaviorConfigSupplier = new Supplier() + { + @Override + public WorkerBehaviorConfig get() + { + return defaultWorkerBehaviourConfig; + } + }; + private final SimpleWorkerProvisioningStrategy resourceManagement = new SimpleWorkerProvisioningStrategy( + new SimpleWorkerProvisioningConfig(), + workerBehaviorConfigSupplier, + new ProvisioningSchedulerConfig() + ); + + @Before + public void setUp() throws Exception + { + rtrUtils = new RemoteTaskRunnerTestUtils(); + rtrUtils.setUp(); + } + + @After + public void tearDown() throws Exception + { + rtrUtils.tearDown(); + } + + /** + * Test that we can start taskRunner, then stop it (emulating "losing leadership", see {@link + * TaskMaster#stopLeading()}), then creating a new taskRunner from {@link + * org.apache.curator.framework.recipes.leader.LeaderSelectorListener#takeLeadership} implementation in + * {@link TaskMaster} and start it again. + */ + @Test(timeout = 10_000) + public void testOverlordBlinkLeadership() + { + try { + RemoteTaskRunner remoteTaskRunner1 = rtrUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig, resourceManagement); + remoteTaskRunner1.stop(); + RemoteTaskRunner remoteTaskRunner2 = rtrUtils.makeRemoteTaskRunner(remoteTaskRunnerConfig, resourceManagement); + remoteTaskRunner2.stop(); + } + catch (Exception e) { + Assert.fail("Should have not thrown any exceptions, thrown: " + e); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java index ff9edb23e8b..e97d9281203 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java @@ -26,9 +26,9 @@ import com.google.common.base.Supplier; import com.metamx.http.client.HttpClient; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.indexing.common.TestUtils; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; -import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -109,8 +109,8 @@ public class RemoteTaskRunnerFactoryTest return ScheduledExecutors.fixed(i, s); } }; - SimpleWorkerResourceManagementConfig resourceManagementConfig = new SimpleWorkerResourceManagementConfig(); - ResourceManagementSchedulerConfig resourceManagementSchedulerConfig = new ResourceManagementSchedulerConfig() + SimpleWorkerProvisioningConfig provisioningConfig = new SimpleWorkerProvisioningConfig(); + ProvisioningSchedulerConfig provisioningSchedulerConfig = new ProvisioningSchedulerConfig() { @Override public boolean isDoAutoscale() @@ -126,19 +126,18 @@ public class RemoteTaskRunnerFactoryTest httpClient, workerBehaviorConfig, executorFactory, - resourceManagementSchedulerConfig, - new SimpleWorkerResourceManagementStrategy( - resourceManagementConfig, + provisioningSchedulerConfig, + new SimpleWorkerProvisioningStrategy( + provisioningConfig, workerBehaviorConfig, - resourceManagementSchedulerConfig, - executorFactory + provisioningSchedulerConfig ) ); - Assert.assertEquals(1, executorCount.get()); + Assert.assertEquals(0, executorCount.get()); RemoteTaskRunner remoteTaskRunner1 = factory.build(); - Assert.assertEquals(2, executorCount.get()); + Assert.assertEquals(1, executorCount.get()); RemoteTaskRunner remoteTaskRunner2 = factory.build(); - Assert.assertEquals(3, executorCount.get()); + Assert.assertEquals(2, executorCount.get()); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index 4b4ac6da9c9..bacfc5097e2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -22,7 +22,6 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Throwables; - import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.PathChildrenCacheFactory; @@ -31,7 +30,8 @@ import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; +import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.TaskAnnouncement; @@ -103,6 +103,15 @@ public class RemoteTaskRunnerTestUtils } RemoteTaskRunner makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception + { + NoopProvisioningStrategy resourceManagement = new NoopProvisioningStrategy<>(); + return makeRemoteTaskRunner(config, resourceManagement); + } + + public RemoteTaskRunner makeRemoteTaskRunner( + RemoteTaskRunnerConfig config, + ProvisioningStrategy provisioningStrategy + ) { RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner( jsonMapper, @@ -122,7 +131,7 @@ public class RemoteTaskRunnerTestUtils null, DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), - new NoopResourceManagementStrategy() + provisioningStrategy ); remoteTaskRunner.start(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java index 3681aa71e7d..831d07ce757 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java @@ -66,7 +66,7 @@ public class EC2AutoScalerTest private DescribeInstancesResult describeInstancesResult; private Reservation reservation; private Instance instance; - private SimpleWorkerResourceManagementConfig managementConfig; + private SimpleWorkerProvisioningConfig managementConfig; @Before public void setUp() throws Exception @@ -81,7 +81,7 @@ public class EC2AutoScalerTest .withImageId(AMI_ID) .withPrivateIpAddress(IP); - managementConfig = new SimpleWorkerResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""); + managementConfig = new SimpleWorkerProvisioningConfig().setWorkerPort(8080).setWorkerVersion(""); } @After diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java similarity index 82% rename from indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java rename to indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index c185b981bf5..3604838ec0d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord.autoscaling; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -59,12 +60,11 @@ import java.util.concurrent.atomic.AtomicReference; /** */ -public class PendingTaskBasedResourceManagementStrategyTest +public class PendingTaskBasedProvisioningStrategyTest { private AutoScaler autoScaler; private Task testTask; - private PendingTaskBasedWorkerResourceManagementConfig config; - private PendingTaskBasedWorkerResourceManagementStrategy strategy; + private PendingTaskBasedWorkerProvisioningStrategy strategy; private AtomicReference workerConfig; private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); private final static String MIN_VERSION = "2014-01-00T00:01:00Z"; @@ -77,7 +77,7 @@ public class PendingTaskBasedResourceManagementStrategyTest testTask = TestTasks.immediateSuccess("task1"); - config = new PendingTaskBasedWorkerResourceManagementConfig() + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) .setNumEventsToTrack(10) .setPendingTaskTimeout(new Period(0)) @@ -91,11 +91,18 @@ public class PendingTaskBasedResourceManagementStrategyTest ) ); - strategy = new PendingTaskBasedWorkerResourceManagementStrategy( + strategy = new PendingTaskBasedWorkerProvisioningStrategy( config, DSuppliers.of(workerConfig), - new ResourceManagementSchedulerConfig(), - executorService + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } ); } @@ -120,10 +127,11 @@ public class PendingTaskBasedResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("aNode")) ).times(3); EasyMock.replay(runner, autoScaler); - boolean provisionedSomething = strategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 3); - for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue(provisioner.getStats().toList().size() == 3); + for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { Assert.assertTrue( event.getEvent() == ScalingStats.EVENT.PROVISION ); @@ -153,10 +161,11 @@ public class PendingTaskBasedResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("aNode")) ).times(2); EasyMock.replay(runner, autoScaler); - boolean provisionedSomething = strategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 2); - for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue(provisioner.getStats().toList().size() == 2); + for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { Assert.assertTrue( event.getEvent() == ScalingStats.EVENT.PROVISION ); @@ -187,10 +196,11 @@ public class PendingTaskBasedResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("aNode")) ).times(2); EasyMock.replay(runner, autoScaler); - boolean provisionedSomething = strategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 2); - for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue(provisioner.getStats().toList().size() == 2); + for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { Assert.assertTrue( event.getEvent() == ScalingStats.EVENT.PROVISION ); @@ -223,22 +233,23 @@ public class PendingTaskBasedResourceManagementStrategyTest EasyMock.replay(runner); EasyMock.replay(autoScaler); - boolean provisionedSomething = strategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 1); - DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - provisionedSomething = strategy.doProvision(runner); + provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp(); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( createdTime.equals(anotherCreatedTime) ); @@ -282,24 +293,25 @@ public class PendingTaskBasedResourceManagementStrategyTest EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); EasyMock.replay(runner); - boolean provisionedSomething = strategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 1); - DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); Thread.sleep(2000); - provisionedSomething = strategy.doProvision(runner); + provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp(); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( createdTime.equals(anotherCreatedTime) ); @@ -343,12 +355,13 @@ public class PendingTaskBasedResourceManagementStrategyTest EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); EasyMock.replay(runner); - boolean terminatedSomething = strategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertTrue(terminatedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); EasyMock.verify(autoScaler); @@ -380,20 +393,21 @@ public class PendingTaskBasedResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = strategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertTrue(terminatedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); - terminatedSomething = strategy.doTerminate(runner); + terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); - Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); EasyMock.verify(autoScaler); @@ -430,7 +444,8 @@ public class PendingTaskBasedResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = strategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -442,7 +457,7 @@ public class PendingTaskBasedResourceManagementStrategyTest .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean provisionedSomething = strategy.doProvision(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); @@ -476,9 +491,8 @@ public class PendingTaskBasedResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = strategy.doTerminate( - runner - ); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -489,9 +503,7 @@ public class PendingTaskBasedResourceManagementStrategyTest EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean provisionedSomething = strategy.doProvision( - runner - ); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); @@ -509,9 +521,7 @@ public class PendingTaskBasedResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("h4")) ); EasyMock.replay(autoScaler); - provisionedSomething = strategy.doProvision( - runner - ); + provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); EasyMock.verify(autoScaler); EasyMock.verify(runner); @@ -536,13 +546,10 @@ public class PendingTaskBasedResourceManagementStrategyTest ).times(2); EasyMock.replay(runner); - boolean terminatedSomething = strategy.doTerminate( - runner - ); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); - boolean provisionedSomething = strategy.doProvision( - runner - ); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(terminatedSomething); Assert.assertFalse(provisionedSomething); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java similarity index 81% rename from indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java rename to indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java index 2d62fb2d5d1..3dfc4201657 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleProvisioningStrategyTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord.autoscaling; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -58,11 +59,11 @@ import java.util.concurrent.atomic.AtomicReference; /** */ -public class SimpleResourceManagementStrategyTest +public class SimpleProvisioningStrategyTest { private AutoScaler autoScaler; private Task testTask; - private SimpleWorkerResourceManagementStrategy simpleResourceManagementStrategy; + private SimpleWorkerProvisioningStrategy strategy; private AtomicReference workerConfig; private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); @@ -72,14 +73,14 @@ public class SimpleResourceManagementStrategyTest autoScaler = EasyMock.createMock(AutoScaler.class); testTask = TestTasks.immediateSuccess("task1"); - final SimpleWorkerResourceManagementConfig simpleWorkerResourceManagementConfig = new SimpleWorkerResourceManagementConfig() + final SimpleWorkerProvisioningConfig simpleWorkerProvisioningConfig = new SimpleWorkerProvisioningConfig() .setWorkerIdleTimeout(new Period(0)) .setMaxScalingDuration(new Period(1000)) .setNumEventsToTrack(1) .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(""); - final ResourceManagementSchedulerConfig schedulerConfig = new ResourceManagementSchedulerConfig(); + final ProvisioningSchedulerConfig schedulerConfig = new ProvisioningSchedulerConfig(); workerConfig = new AtomicReference<>( new WorkerBehaviorConfig( @@ -88,11 +89,18 @@ public class SimpleResourceManagementStrategyTest ) ); - simpleResourceManagementStrategy = new SimpleWorkerResourceManagementStrategy( - simpleWorkerResourceManagementConfig, + strategy = new SimpleWorkerProvisioningStrategy( + simpleWorkerProvisioningConfig, DSuppliers.of(workerConfig), schedulerConfig, - executorService + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } ); } @@ -126,12 +134,13 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(runner); EasyMock.replay(autoScaler); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); EasyMock.verify(autoScaler); @@ -162,22 +171,23 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(runner); EasyMock.replay(autoScaler); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); - DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( createdTime.equals(anotherCreatedTime) ); @@ -218,24 +228,25 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.replay(runner); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); - DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); Thread.sleep(2000); - provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION ); - DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); Assert.assertTrue( createdTime.equals(anotherCreatedTime) ); @@ -276,12 +287,13 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); EasyMock.replay(runner); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertTrue(terminatedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); EasyMock.verify(autoScaler); @@ -319,20 +331,21 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertTrue(terminatedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); - terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); + terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); - Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + Assert.assertTrue(provisioner.getStats().toList().size() == 1); Assert.assertTrue( - simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); EasyMock.verify(autoScaler); @@ -368,7 +381,8 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -380,7 +394,7 @@ public class SimpleResourceManagementStrategyTest .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); @@ -413,9 +427,8 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.replay(runner); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - runner - ); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -426,9 +439,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - runner - ); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(provisionedSomething); EasyMock.verify(autoScaler); @@ -446,9 +457,7 @@ public class SimpleResourceManagementStrategyTest new AutoScalingData(Lists.newArrayList("h4")) ); EasyMock.replay(autoScaler); - provisionedSomething = simpleResourceManagementStrategy.doProvision( - runner - ); + provisionedSomething = provisioner.doProvision(); Assert.assertTrue(provisionedSomething); EasyMock.verify(autoScaler); EasyMock.verify(runner); @@ -473,13 +482,10 @@ public class SimpleResourceManagementStrategyTest ).times(1); EasyMock.replay(runner); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - runner - ); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean terminatedSomething = provisioner.doTerminate(); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - runner - ); + boolean provisionedSomething = provisioner.doProvision(); Assert.assertFalse(terminatedSomething); Assert.assertFalse(provisionedSomething); diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java index 996bf559c5e..57f93b4e4cb 100644 --- a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -21,7 +21,6 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.MapMaker; @@ -38,8 +37,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -106,9 +103,7 @@ public abstract class AbstractCuratorServerInventoryView implemen return jsonMapper.readValue(bytes, typeReference); } catch (IOException e) { - CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray()); - CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)); - log.error(e, "Could not parse json: %s", charBuffer.toString()); + log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes)); throw Throwables.propagate(e); } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 099fdf71e4c..9ae4b523330 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -60,12 +60,12 @@ import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; -import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig; -import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; -import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig; +import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy; +import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.helpers.OverlordHelper; import io.druid.indexing.overlord.helpers.TaskLogAutoCleaner; @@ -230,26 +230,26 @@ public class CliOverlord extends ServerRunnable private void configureAutoscale(Binder binder) { - JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ProvisioningSchedulerConfig.class); JsonConfigProvider.bind( binder, "druid.indexer.autoscale", - PendingTaskBasedWorkerResourceManagementConfig.class + PendingTaskBasedWorkerProvisioningConfig.class ); - JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerResourceManagementConfig.class); + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerProvisioningConfig.class); PolyBind.createChoice( binder, "druid.indexer.autoscale.strategy.type", - Key.get(ResourceManagementStrategy.class), - Key.get(SimpleWorkerResourceManagementStrategy.class) + Key.get(ProvisioningStrategy.class), + Key.get(SimpleWorkerProvisioningStrategy.class) ); - final MapBinder biddy = PolyBind.optionBinder( + final MapBinder biddy = PolyBind.optionBinder( binder, - Key.get(ResourceManagementStrategy.class) + Key.get(ProvisioningStrategy.class) ); - biddy.addBinding("simple").to(SimpleWorkerResourceManagementStrategy.class); - biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class); + biddy.addBinding("simple").to(SimpleWorkerProvisioningStrategy.class); + biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerProvisioningStrategy.class); }