diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index cb279ab088c..1c7345c98b7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -382,6 +382,17 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return ImmutableList.copyOf(pendingTasks.values()); } + public Collection getPendingTaskPayloads() + { + // return a snapshot of current pending task payloads. + return ImmutableList.copyOf(pendingTaskPayloads.values()); + } + + public RemoteTaskRunnerConfig getConfig() + { + return config; + } + @Override public Collection getKnownTasks() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 4901c8c2162..0892d1f11e3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -30,8 +30,8 @@ import io.druid.guice.annotations.Global; import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -44,15 +44,14 @@ import java.util.concurrent.ScheduledExecutorService; public class RemoteTaskRunnerFactory implements TaskRunnerFactory { public static final String TYPE_NAME = "remote"; - private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class); private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final IndexerZkConfig zkPaths; private final ObjectMapper jsonMapper; private final HttpClient httpClient; private final Supplier workerConfigRef; - private final SimpleResourceManagementConfig config; private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; + private final ResourceManagementStrategy resourceManagementStrategy; private final ScheduledExecutorFactory factory; @Inject @@ -64,8 +63,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory workerConfigRef, final ScheduledExecutorFactory factory, - final SimpleResourceManagementConfig config, - final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig + final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + final ResourceManagementStrategy resourceManagementStrategy ) { this.curator = curator; @@ -74,25 +73,14 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory resourceManagementStrategy; - if (resourceManagementSchedulerConfig.isDoAutoscale()) { - resourceManagementStrategy = new SimpleResourceManagementStrategy( - config, - workerConfigRef, - resourceManagementSchedulerConfig, - factory.create(1, "RemoteTaskRunner-ResourceManagement--%d") - ); - } else { - resourceManagementStrategy = new NoopResourceManagementStrategy<>(); - } return new RemoteTaskRunner( jsonMapper, remoteTaskRunnerConfig, @@ -105,7 +93,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory() ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java index 01e90a1cdc8..43d283fc0be 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord; import com.google.common.base.Predicate; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import io.druid.indexing.worker.Worker; import java.util.Collection; @@ -44,4 +46,9 @@ public interface WorkerTaskRunner extends TaskRunner * @return */ Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers); + + WorkerTaskRunnerConfig getConfig(); + + Collection getPendingTaskPayloads(); + } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java new file mode 100644 index 00000000000..49270af80eb --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AbstractWorkerResourceManagementStrategy.java @@ -0,0 +1,123 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.emitter.EmittingLogger; +import io.druid.granularity.PeriodGranularity; +import io.druid.indexing.overlord.WorkerTaskRunner; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public abstract class AbstractWorkerResourceManagementStrategy implements ResourceManagementStrategy +{ + private static final EmittingLogger log = new EmittingLogger(AbstractWorkerResourceManagementStrategy.class); + + private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; + private final ScheduledExecutorService exec; + private final Object lock = new Object(); + + private volatile boolean started = false; + + protected AbstractWorkerResourceManagementStrategy( + ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + ScheduledExecutorService exec + ) + { + this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; + this.exec = exec; + } + + @Override + public void startManagement(final WorkerTaskRunner runner) + { + synchronized (lock) { + if (started) { + return; + } + + log.info("Started Resource Management Scheduler"); + + ScheduledExecutors.scheduleAtFixedRate( + exec, + resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + // Any Errors are caught by ScheduledExecutors + doProvision(runner); + } + } + ); + + // Schedule termination of worker nodes periodically + Period period = resourceManagementSchedulerConfig.getTerminatePeriod(); + PeriodGranularity granularity = new PeriodGranularity( + period, + resourceManagementSchedulerConfig.getOriginTime(), + null + ); + final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); + + ScheduledExecutors.scheduleAtFixedRate( + exec, + new Duration(System.currentTimeMillis(), startTime), + resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + // Any Errors are caught by ScheduledExecutors + doTerminate(runner); + } + } + ); + + started = true; + + } + } + + abstract boolean doTerminate(WorkerTaskRunner runner); + + abstract boolean doProvision(WorkerTaskRunner runner); + + @Override + public void stopManagement() + { + synchronized (lock) { + if (!started) { + return; + } + log.info("Stopping Resource Management Scheduler"); + exec.shutdown(); + started = false; + } + } + +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java new file mode 100644 index 00000000000..4d107865797 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Period; + +/** + */ +public class PendingTaskBasedWorkerResourceManagementConfig extends SimpleWorkerResourceManagementConfig +{ + @JsonProperty + private int maxScalingStep = 10; + + + public int getMaxScalingStep() + { + return maxScalingStep; + } + + public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingStep(int maxScalingStep) + { + this.maxScalingStep = maxScalingStep; + return this; + } + + public PendingTaskBasedWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) + { + super.setWorkerIdleTimeout(workerIdleTimeout); + return this; + } + + public PendingTaskBasedWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) + { + super.setMaxScalingDuration(maxScalingDuration); + return this; + } + + public PendingTaskBasedWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) + { + super.setNumEventsToTrack(numEventsToTrack); + return this; + } + + public PendingTaskBasedWorkerResourceManagementConfig setWorkerVersion(String workerVersion) + { + super.setWorkerVersion(workerVersion); + return this; + } + + public PendingTaskBasedWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) + { + super.setPendingTaskTimeout(pendingTaskTimeout); + return this; + } + +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java new file mode 100644 index 00000000000..fe5fd8e774f --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerResourceManagementStrategy.java @@ -0,0 +1,412 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableWorkerInfo; +import io.druid.indexing.overlord.WorkerTaskRunner; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; +import io.druid.indexing.worker.Worker; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class PendingTaskBasedWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy +{ + private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerResourceManagementStrategy.class); + + private final PendingTaskBasedWorkerResourceManagementConfig config; + private final Supplier workerConfigRef; + private final ScalingStats scalingStats; + + private final Object lock = new Object(); + private final Set currentlyProvisioning = Sets.newHashSet(); + private final Set currentlyTerminating = Sets.newHashSet(); + + private DateTime lastProvisionTime = new DateTime(); + private DateTime lastTerminateTime = new DateTime(); + + @Inject + public PendingTaskBasedWorkerResourceManagementStrategy( + PendingTaskBasedWorkerResourceManagementConfig config, + Supplier workerConfigRef, + ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + ScheduledExecutorFactory factory + ) + { + this( + config, + workerConfigRef, + resourceManagementSchedulerConfig, + factory.create(1, "PendingTaskBasedResourceManagement-manager--%d") + ); + } + + public PendingTaskBasedWorkerResourceManagementStrategy( + PendingTaskBasedWorkerResourceManagementConfig config, + Supplier workerConfigRef, + ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + ScheduledExecutorService exec + ) + { + super(resourceManagementSchedulerConfig, exec); + this.config = config; + this.workerConfigRef = workerConfigRef; + this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); + } + + @Override + public boolean doProvision(WorkerTaskRunner runner) + { + Collection pendingTasks = runner.getPendingTaskPayloads(); + Collection workers = runner.getWorkers(); + synchronized (lock) { + boolean didProvision = false; + final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); + if (workerConfig == null || workerConfig.getAutoScaler() == null) { + log.error("No workerConfig available, cannot provision new workers."); + return false; + } + + final Collection workerNodeIds = getWorkerNodeIDs( + Collections2.transform( + workers, + new Function() + { + @Override + public Worker apply(ImmutableWorkerInfo input) + { + return input.getWorker(); + } + } + ), + workerConfig + ); + currentlyProvisioning.removeAll(workerNodeIds); + if (currentlyProvisioning.isEmpty()) { + int want = getScaleUpNodeCount( + runner.getConfig(), + workerConfig, + pendingTasks, + workers + ); + while (want > 0) { + final AutoScalingData provisioned = workerConfig.getAutoScaler().provision(); + final List newNodes = provisioned == null ? ImmutableList.of() : provisioned.getNodeIds(); + if (newNodes.isEmpty()) { + log.warn("NewNodes is empty, returning from provision loop"); + break; + } else { + currentlyProvisioning.addAll(newNodes); + lastProvisionTime = new DateTime(); + scalingStats.addProvisionEvent(provisioned); + want -= provisioned.getNodeIds().size(); + didProvision = true; + } + } + } else { + Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); + log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision); + if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { + log.makeAlert("Worker node provisioning taking too long!") + .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) + .addData("provisioningCount", currentlyProvisioning.size()) + .emit(); + + workerConfig.getAutoScaler().terminateWithIds(Lists.newArrayList(currentlyProvisioning)); + currentlyProvisioning.clear(); + } + } + + return didProvision; + } + } + + private static Collection getWorkerNodeIDs(Collection workers, WorkerBehaviorConfig workerConfig) + { + return workerConfig.getAutoScaler().ipToIdLookup( + Lists.newArrayList( + Iterables.transform( + workers, + new Function() + { + @Override + public String apply(Worker input) + { + return input.getIp(); + } + } + ) + ) + ); + } + + int getScaleUpNodeCount( + final WorkerTaskRunnerConfig remoteTaskRunnerConfig, + final WorkerBehaviorConfig workerConfig, + final Collection pendingTasks, + final Collection workers + ) + { + final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); + final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); + final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); + final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); + + // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need since + // we are not aware of the expectedWorkerCapacity. + int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks( + remoteTaskRunnerConfig, + workerConfig, + pendingTasks, + workers + ); + + int want = Math.max( + minWorkerCount - currValidWorkers, + // Additional workers needed to reach minWorkerCount + Math.min(config.getMaxScalingStep(), moreWorkersNeeded) + // Additional workers needed to run current pending tasks + ); + + if (want > 0 && currValidWorkers >= maxWorkerCount) { + log.warn("Unable to provision more workers. Current workerCount[%d] maximum workerCount[%d]."); + return 0; + } + want = Math.min(want, maxWorkerCount - currValidWorkers); + return want; + } + + int getWorkersNeededToAssignTasks( + final WorkerTaskRunnerConfig workerTaskRunnerConfig, + final WorkerBehaviorConfig workerConfig, + final Collection pendingTasks, + final Collection workers + ) + { + final Collection validWorkers = Collections2.filter( + workers, + ResourceManagementUtil.createValidWorkerPredicate(config) + ); + + Map workersMap = Maps.newHashMap(); + for (ImmutableWorkerInfo worker : validWorkers) { + workersMap.put(worker.getWorker().getHost(), worker); + } + WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); + int need = 0; + int capacity = getExpectedWorkerCapacity(workers); + + // Simulate assigning tasks to dummy workers using configured workerSelectStrategy + // the number of additional workers needed to assign all the pending tasks is noted + for (Task task : pendingTasks) { + Optional selectedWorker = workerSelectStrategy.findWorkerForTask( + workerTaskRunnerConfig, + ImmutableMap.copyOf(workersMap), + task + ); + final ImmutableWorkerInfo workerRunningTask; + if (selectedWorker.isPresent()) { + workerRunningTask = selectedWorker.get(); + } else { + // None of the existing worker can run this task, we need to provision one worker for it. + // create a dummy worker and try to simulate assigning task to it. + workerRunningTask = createDummyWorker("dummy" + need, capacity, workerTaskRunnerConfig.getMinWorkerVersion()); + need++; + } + // Update map with worker running task + workersMap.put(workerRunningTask.getWorker().getHost(), workerWithTask(workerRunningTask, task)); + } + return need; + } + + @Override + public boolean doTerminate(WorkerTaskRunner runner) + { + Collection zkWorkers = runner.getWorkers(); + synchronized (lock) { + final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); + if (workerConfig == null) { + log.warn("No workerConfig available, cannot terminate workers."); + return false; + } + + if (!currentlyProvisioning.isEmpty()) { + log.debug("Already provisioning nodes, Not Terminating any nodes."); + return false; + } + + boolean didTerminate = false; + final Collection workerNodeIds = getWorkerNodeIDs(runner.getLazyWorkers(), workerConfig); + final Set stillExisting = Sets.newHashSet(); + for (String s : currentlyTerminating) { + if (workerNodeIds.contains(s)) { + stillExisting.add(s); + } + } + currentlyTerminating.clear(); + currentlyTerminating.addAll(stillExisting); + + if (currentlyTerminating.isEmpty()) { + final int maxWorkersToTerminate = maxWorkersToTerminate(zkWorkers, workerConfig); + final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); + final List laziestWorkerIps = + Lists.newArrayList( + Collections2.transform( + runner.markWorkersLazy(isLazyWorker, maxWorkersToTerminate), + new Function() + { + @Override + public String apply(Worker zkWorker) + { + return zkWorker.getIp(); + } + } + ) + ); + if (laziestWorkerIps.isEmpty()) { + log.debug("Found no lazy workers"); + } else { + log.info( + "Terminating %,d lazy workers: %s", + laziestWorkerIps.size(), + Joiner.on(", ").join(laziestWorkerIps) + ); + + final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps); + if (terminated != null) { + currentlyTerminating.addAll(terminated.getNodeIds()); + lastTerminateTime = new DateTime(); + scalingStats.addTerminateEvent(terminated); + didTerminate = true; + } + } + } else { + Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); + + log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate); + + if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { + log.makeAlert("Worker node termination taking too long!") + .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) + .addData("terminatingCount", currentlyTerminating.size()) + .emit(); + + currentlyTerminating.clear(); + } + } + + return didTerminate; + } + } + + private int maxWorkersToTerminate(Collection zkWorkers, WorkerBehaviorConfig workerConfig) + { + final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); + final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); + final int invalidWorkers = zkWorkers.size() - currValidWorkers; + final int minWorkers = workerConfig.getAutoScaler().getMinNumWorkers(); + + // Max workers that can be terminated + // All invalid workers + any lazy workers above minCapacity + return invalidWorkers + Math.max( + 0, + Math.min( + config.getMaxScalingStep(), + currValidWorkers - minWorkers + ) + ); + } + + @Override + public ScalingStats getStats() + { + return scalingStats; + } + + private static int getExpectedWorkerCapacity(final Collection workers) + { + int size = workers.size(); + if (size == 0) { + // No existing workers assume capacity per worker as 1 + return 1; + } else { + // Assume all workers have same capacity + return workers.iterator().next().getWorker().getCapacity(); + } + } + + private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task) + { + return new ImmutableWorkerInfo( + immutableWorker.getWorker(), + immutableWorker.getCurrCapacityUsed() + 1, + Sets.union( + immutableWorker.getAvailabilityGroups(), + Sets.newHashSet( + task.getTaskResource() + .getAvailabilityGroup() + ) + ), + Sets.union( + immutableWorker.getRunningTasks(), + Sets.newHashSet( + task.getId() + ) + ), + DateTime.now() + ); + } + + private static ImmutableWorkerInfo createDummyWorker(String host, int capacity, String version) + { + return new ImmutableWorkerInfo( + new Worker(host, "-2", capacity, version), + 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() + ); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java new file mode 100644 index 00000000000..e23f3fdb71b --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.google.common.base.Predicate; +import com.metamx.common.ISE; +import io.druid.indexing.overlord.ImmutableWorkerInfo; +import io.druid.indexing.overlord.ZkWorker; +import io.druid.indexing.worker.Worker; + +public class ResourceManagementUtil +{ + public static Predicate createValidWorkerPredicate( + final SimpleWorkerResourceManagementConfig config + ) + { + return new Predicate() + { + @Override + public boolean apply(ImmutableWorkerInfo worker) + { + final String minVersion = config.getWorkerVersion(); + if (minVersion == null) { + throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); + } + return worker.isValidVersion(minVersion); + } + }; + } + + public static Predicate createLazyWorkerPredicate( + final SimpleWorkerResourceManagementConfig config + ) + { + final Predicate isValidWorker = createValidWorkerPredicate(config); + + return new Predicate() + { + @Override + public boolean apply(ImmutableWorkerInfo worker) + { + final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() + >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); + return itHasBeenAWhile || !isValidWorker.apply(worker); + } + }; + } + +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java similarity index 79% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java index 4d8b13f794d..47ed29c85e9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementConfig.java @@ -24,7 +24,7 @@ import org.joda.time.Period; /** */ -public class SimpleResourceManagementConfig +public class SimpleWorkerResourceManagementConfig { @JsonProperty private Period workerIdleTimeout = new Period("PT90m"); @@ -49,7 +49,7 @@ public class SimpleResourceManagementConfig return workerIdleTimeout; } - public SimpleResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) + public SimpleWorkerResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) { this.workerIdleTimeout = workerIdleTimeout; return this; @@ -60,7 +60,7 @@ public class SimpleResourceManagementConfig return maxScalingDuration; } - public SimpleResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) + public SimpleWorkerResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) { this.maxScalingDuration = maxScalingDuration; return this; @@ -71,7 +71,7 @@ public class SimpleResourceManagementConfig return numEventsToTrack; } - public SimpleResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) + public SimpleWorkerResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) { this.numEventsToTrack = numEventsToTrack; return this; @@ -82,7 +82,7 @@ public class SimpleResourceManagementConfig return pendingTaskTimeout; } - public SimpleResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) + public SimpleWorkerResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) { this.pendingTaskTimeout = pendingTaskTimeout; return this; @@ -93,7 +93,7 @@ public class SimpleResourceManagementConfig return workerVersion; } - public SimpleResourceManagementConfig setWorkerVersion(String workerVersion) + public SimpleWorkerResourceManagementConfig setWorkerVersion(String workerVersion) { this.workerVersion = workerVersion; return this; @@ -105,7 +105,7 @@ public class SimpleResourceManagementConfig return workerPort; } - public SimpleResourceManagementConfig setWorkerPort(int workerPort) + public SimpleWorkerResourceManagementConfig setWorkerPort(int workerPort) { this.workerPort = workerPort; return this; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java similarity index 75% rename from indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java rename to indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java index 3d2b1cd43ac..7eead8ce843 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleWorkerResourceManagementStrategy.java @@ -29,11 +29,8 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; -import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; -import io.druid.granularity.PeriodGranularity; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.WorkerTaskRunner; @@ -41,7 +38,6 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; import org.joda.time.Duration; -import org.joda.time.Period; import java.util.Collection; import java.util.List; @@ -50,16 +46,12 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class SimpleResourceManagementStrategy implements ResourceManagementStrategy + +public class SimpleWorkerResourceManagementStrategy extends AbstractWorkerResourceManagementStrategy { - private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); + private static final EmittingLogger log = new EmittingLogger(SimpleWorkerResourceManagementStrategy.class); - private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; - private final ScheduledExecutorService exec; - - private volatile boolean started = false; - - private final SimpleResourceManagementConfig config; + private final SimpleWorkerResourceManagementConfig config; private final Supplier workerConfigRef; private final ScalingStats scalingStats; @@ -72,8 +64,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private DateTime lastTerminateTime = new DateTime(); @Inject - public SimpleResourceManagementStrategy( - SimpleResourceManagementConfig config, + public SimpleWorkerResourceManagementStrategy( + SimpleWorkerResourceManagementConfig config, Supplier workerConfigRef, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorFactory factory @@ -87,21 +79,21 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat ); } - public SimpleResourceManagementStrategy( - SimpleResourceManagementConfig config, + public SimpleWorkerResourceManagementStrategy( + SimpleWorkerResourceManagementConfig config, Supplier workerConfigRef, ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, ScheduledExecutorService exec ) { + super(resourceManagementSchedulerConfig, exec); this.config = config; this.workerConfigRef = workerConfigRef; this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); - this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; - this.exec = exec; } - boolean doProvision(WorkerTaskRunner runner) + + protected boolean doProvision(WorkerTaskRunner runner) { Collection pendingTasks = runner.getPendingTasks(); Collection workers = getWorkers(runner); @@ -112,7 +104,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat log.warn("No workerConfig available, cannot provision new workers."); return false; } - final Predicate isValidWorker = createValidWorkerPredicate(config); + + final Predicate isValidWorker = ResourceManagementUtil.createValidWorkerPredicate(config); final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); final List workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup( @@ -214,16 +207,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; if (excessWorkers > 0) { - final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); final Collection laziestWorkerIps = Collections2.transform( runner.markWorkersLazy(isLazyWorker, excessWorkers), new Function() { @Override - public String apply(Worker zkWorker) + public String apply(Worker worker) { - return zkWorker.getIp(); + return worker.getIp(); } } ); @@ -237,7 +230,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat Joiner.on(", ").join(laziestWorkerIps) ); - final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(ImmutableList.copyOf(laziestWorkerIps)); + final AutoScalingData terminated = workerConfig.getAutoScaler() + .terminate(ImmutableList.copyOf(laziestWorkerIps)); if (terminated != null) { currentlyTerminating.addAll(terminated.getNodeIds()); lastTerminateTime = new DateTime(); @@ -265,111 +259,6 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - @Override - public void startManagement(final WorkerTaskRunner runner) - { - synchronized (lock) { - if (started) { - return; - } - - log.info("Started Resource Management Scheduler"); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - doProvision(runner); - } - } - ); - - // Schedule termination of worker nodes periodically - Period period = resourceManagementSchedulerConfig.getTerminatePeriod(); - PeriodGranularity granularity = new PeriodGranularity( - period, - resourceManagementSchedulerConfig.getOriginTime(), - null - ); - final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - new Duration(System.currentTimeMillis(), startTime), - resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - doTerminate(runner); - } - } - ); - - started = true; - - } - } - - @Override - public void stopManagement() - { - synchronized (lock) { - if (!started) { - return; - } - log.info("Stopping Resource Management Scheduler"); - exec.shutdown(); - started = false; - } - } - - @Override - public ScalingStats getStats() - { - return scalingStats; - } - - private static Predicate createLazyWorkerPredicate( - final SimpleResourceManagementConfig config - ) - { - final Predicate isValidWorker = createValidWorkerPredicate(config); - - return new Predicate() - { - @Override - public boolean apply(ImmutableWorkerInfo worker) - { - final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() - >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); - return itHasBeenAWhile || !isValidWorker.apply(worker); - } - }; - } - - private static Predicate createValidWorkerPredicate( - final SimpleResourceManagementConfig config - ) - { - return new Predicate() - { - @Override - public boolean apply(ImmutableWorkerInfo worker) - { - final String minVersion = config.getWorkerVersion(); - if (minVersion == null) { - throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); - } - return worker.isValidVersion(minVersion); - } - }; - } private void updateTargetWorkerCount( final WorkerBehaviorConfig workerConfig, @@ -380,9 +269,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat synchronized (lock) { final Collection validWorkers = Collections2.filter( zkWorkers, - createValidWorkerPredicate(config) + ResourceManagementUtil.createValidWorkerPredicate(config) ); - final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Predicate isLazyWorker = ResourceManagementUtil.createLazyWorkerPredicate(config); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); @@ -469,4 +358,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat { return runner.getWorkers(); } + + @Override + public ScalingStats getStats() + { + return scalingStats; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java index bbc5f9eab1e..f14219013fa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ec2/EC2AutoScaler.java @@ -39,7 +39,7 @@ import com.google.common.collect.Lists; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.overlord.autoscaling.AutoScaler; import io.druid.indexing.overlord.autoscaling.AutoScalingData; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; import java.util.List; @@ -54,7 +54,7 @@ public class EC2AutoScaler implements AutoScaler private final int maxNumWorkers; private final EC2EnvironmentConfig envConfig; private final AmazonEC2 amazonEC2Client; - private final SimpleResourceManagementConfig config; + private final SimpleWorkerResourceManagementConfig config; @JsonCreator public EC2AutoScaler( @@ -62,7 +62,7 @@ public class EC2AutoScaler implements AutoScaler @JsonProperty("maxNumWorkers") int maxNumWorkers, @JsonProperty("envConfig") EC2EnvironmentConfig envConfig, @JacksonInject AmazonEC2 amazonEC2Client, - @JacksonInject SimpleResourceManagementConfig config + @JacksonInject SimpleWorkerResourceManagementConfig config ) { this.minNumWorkers = minNumWorkers; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index f6971a5d187..e2321964bf8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -28,7 +28,7 @@ import javax.validation.constraints.NotNull; /** */ -public class RemoteTaskRunnerConfig +public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig { @JsonProperty @NotNull @@ -38,9 +38,6 @@ public class RemoteTaskRunnerConfig @NotNull private Period taskCleanupTimeout = new Period("PT15M"); - @JsonProperty - private String minWorkerVersion = "0"; - @JsonProperty @Min(10 * 1024) private int maxZnodeBytes = CuratorUtils.DEFAULT_MAX_ZNODE_BYTES; @@ -62,11 +59,6 @@ public class RemoteTaskRunnerConfig return taskCleanupTimeout; } - public String getMinWorkerVersion() - { - return minWorkerVersion; - } - public int getMaxZnodeBytes() { return maxZnodeBytes; @@ -107,7 +99,7 @@ public class RemoteTaskRunnerConfig if (!taskCleanupTimeout.equals(that.taskCleanupTimeout)) { return false; } - if (!minWorkerVersion.equals(that.minWorkerVersion)) { + if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) { return false; } return taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout); @@ -119,7 +111,7 @@ public class RemoteTaskRunnerConfig { int result = taskAssignmentTimeout.hashCode(); result = 31 * result + taskCleanupTimeout.hashCode(); - result = 31 * result + minWorkerVersion.hashCode(); + result = 31 * result + getMinWorkerVersion().hashCode(); result = 31 * result + maxZnodeBytes; result = 31 * result + taskShutdownLinkTimeout.hashCode(); result = 31 * result + pendingTasksRunnerNumThreads; @@ -132,7 +124,7 @@ public class RemoteTaskRunnerConfig return "RemoteTaskRunnerConfig{" + "taskAssignmentTimeout=" + taskAssignmentTimeout + ", taskCleanupTimeout=" + taskCleanupTimeout + - ", minWorkerVersion='" + minWorkerVersion + '\'' + + ", minWorkerVersion='" + getMinWorkerVersion() + '\'' + ", maxZnodeBytes=" + maxZnodeBytes + ", taskShutdownLinkTimeout=" + taskShutdownLinkTimeout + ", pendingTasksRunnerNumThreads=" + pendingTasksRunnerNumThreads + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java new file mode 100644 index 00000000000..0a4045c08ae --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.config; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class WorkerTaskRunnerConfig +{ + @JsonProperty + private String minWorkerVersion = "0"; + + public String getMinWorkerVersion() + { + return minWorkerVersion; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java index d96038f0136..a193d4b9b87 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java @@ -26,6 +26,7 @@ import com.google.common.primitives.Ints; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import java.util.Comparator; import java.util.TreeSet; @@ -36,7 +37,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate { @Override public Optional findWorkerForTask( - RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task + WorkerTaskRunnerConfig config, ImmutableMap zkWorkers, Task task ) { final TreeSet sortedWorkers = Sets.newTreeSet( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java index df7487f7d75..be7fc0d4541 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java @@ -27,6 +27,7 @@ import com.google.common.collect.Sets; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import java.util.List; import java.util.Set; @@ -59,7 +60,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo @Override public Optional findWorkerForTask( - final RemoteTaskRunnerConfig config, + final WorkerTaskRunnerConfig config, final ImmutableMap zkWorkers, final Task task ) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 1116e70bf1d..fbf7a40f646 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -26,6 +26,7 @@ import com.google.common.primitives.Ints; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import java.util.Comparator; import java.util.TreeSet; @@ -36,7 +37,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy { @Override public Optional findWorkerForTask( - final RemoteTaskRunnerConfig config, + final WorkerTaskRunnerConfig config, final ImmutableMap zkWorkers, final Task task ) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java index e22f8aeb051..2555179a30c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; import javax.script.Compilable; import javax.script.Invocable; @@ -39,7 +40,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy { public static interface SelectorFunction { - public String apply(RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task); + public String apply(WorkerTaskRunnerConfig config, ImmutableMap zkWorkers, Task task); } private final SelectorFunction fnSelector; @@ -62,7 +63,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy @Override public Optional findWorkerForTask( - RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task + WorkerTaskRunnerConfig config, ImmutableMap zkWorkers, Task task ) { String worker = fnSelector.apply(config, zkWorkers, task); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index c9127793744..1e0cec76047 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig; /** * The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to. @@ -49,7 +50,7 @@ public interface WorkerSelectStrategy * @return A {@link io.druid.indexing.overlord.ImmutableWorkerInfo} to run the task if one is available. */ Optional findWorkerForTask( - final RemoteTaskRunnerConfig config, + final WorkerTaskRunnerConfig config, final ImmutableMap zkWorkers, final Task task ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java index fe6020ae4c5..5b7fbc37c44 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java @@ -29,10 +29,10 @@ import com.metamx.http.client.HttpClient; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.indexing.common.TestUtils; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import io.druid.jackson.DefaultObjectMapper; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import junit.framework.Assert; @@ -109,7 +109,7 @@ public class RemoteTaskRunnerFactoryTest return ScheduledExecutors.fixed(i, s); } }; - SimpleResourceManagementConfig resourceManagementConfig = new SimpleResourceManagementConfig(); + SimpleWorkerResourceManagementConfig resourceManagementConfig = new SimpleWorkerResourceManagementConfig(); ResourceManagementSchedulerConfig resourceManagementSchedulerConfig = new ResourceManagementSchedulerConfig() { @Override @@ -126,14 +126,19 @@ public class RemoteTaskRunnerFactoryTest httpClient, workerBehaviorConfig, executorFactory, - resourceManagementConfig, - resourceManagementSchedulerConfig + resourceManagementSchedulerConfig, + new SimpleWorkerResourceManagementStrategy( + resourceManagementConfig, + workerBehaviorConfig, + resourceManagementSchedulerConfig, + executorFactory + ) ); - Assert.assertEquals(0, executorCount.get()); + Assert.assertEquals(1, executorCount.get()); RemoteTaskRunner remoteTaskRunner1 = factory.build(); Assert.assertEquals(2, executorCount.get()); RemoteTaskRunner remoteTaskRunner2 = factory.build(); - Assert.assertEquals(4, executorCount.get()); + Assert.assertEquals(3, executorCount.get()); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java index 85ff55e48d2..bd131e006f6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerTest.java @@ -66,7 +66,7 @@ public class EC2AutoScalerTest private DescribeInstancesResult describeInstancesResult; private Reservation reservation; private Instance instance; - private SimpleResourceManagementConfig managementConfig; + private SimpleWorkerResourceManagementConfig managementConfig; @Before public void setUp() throws Exception @@ -81,7 +81,7 @@ public class EC2AutoScalerTest .withImageId(AMI_ID) .withPrivateIpAddress(IP); - managementConfig = new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""); + managementConfig = new SimpleWorkerResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""); } @After diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java new file mode 100644 index 00000000000..e9d08675a3b --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/PendingTaskBasedResourceManagementStrategyTest.java @@ -0,0 +1,604 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; +import io.druid.common.guava.DSuppliers; +import io.druid.concurrent.Execs; +import io.druid.indexing.common.TaskLocation; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TestTasks; +import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableWorkerInfo; +import io.druid.indexing.overlord.RemoteTaskRunner; +import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; +import io.druid.indexing.overlord.ZkWorker; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.indexing.worker.TaskAnnouncement; +import io.druid.indexing.worker.Worker; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class PendingTaskBasedResourceManagementStrategyTest +{ + private AutoScaler autoScaler; + private Task testTask; + private PendingTaskBasedWorkerResourceManagementConfig config; + private PendingTaskBasedWorkerResourceManagementStrategy strategy; + private AtomicReference workerConfig; + private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); + private final static String MIN_VERSION = "2014-01-00T00:01:00Z"; + private final static String INVALID_VERSION = "0"; + + @Before + public void setUp() throws Exception + { + autoScaler = EasyMock.createMock(AutoScaler.class); + + testTask = TestTasks.immediateSuccess("task1"); + + config = new PendingTaskBasedWorkerResourceManagementConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2); + + workerConfig = new AtomicReference<>( + new WorkerBehaviorConfig( + new FillCapacityWorkerSelectStrategy(), + autoScaler + ) + ); + + strategy = new PendingTaskBasedWorkerResourceManagementStrategy( + config, + DSuppliers.of(workerConfig), + new ResourceManagementSchedulerConfig(), + executorService + ); + } + + @Test + public void testSuccessfulInitialMinWorkersProvision() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Lists.newArrayList() + ); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + ) + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("aNode")) + ).times(3); + EasyMock.replay(runner, autoScaler); + boolean provisionedSomething = strategy.doProvision(runner); + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 3); + for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue( + event.getEvent() == ScalingStats.EVENT.PROVISION + ); + } + } + + @Test + public void testSuccessfulMinWorkersProvision() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Lists.newArrayList() + ); + // 1 node already running, only provision 2 more. + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable() + ) + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("aNode")) + ).times(2); + EasyMock.replay(runner, autoScaler); + boolean provisionedSomething = strategy.doProvision(runner); + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 2); + for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue( + event.getEvent() == ScalingStats.EVENT.PROVISION + ); + } + } + + @Test + public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Lists.newArrayList() + ); + // 1 node already running, only provision 2 more. + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node + ) + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("aNode")) + ).times(2); + EasyMock.replay(runner, autoScaler); + boolean provisionedSomething = strategy.doProvision(runner); + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 2); + for (ScalingStats.ScalingEvent event : strategy.getStats().toList()) { + Assert.assertTrue( + event.getEvent() == ScalingStats.EVENT.PROVISION + ); + } + } + + @Test + public void testSomethingProvisioning() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("fake")) + ); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Arrays.asList( + NoopTask.create() + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node + ) + ).times(2); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + boolean provisionedSomething = strategy.doProvision(runner); + + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 1); + DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + + provisionedSomething = strategy.doProvision(runner); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testProvisionAlert() throws Exception + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + emitter.emit(EasyMock.anyObject()); + EasyMock.expectLastCall(); + EasyMock.replay(emitter); + + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(autoScaler.terminateWithIds(EasyMock.>anyObject())) + .andReturn(null); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("fake")) + ); + EasyMock.replay(autoScaler); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Arrays.asList( + NoopTask.create() + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask, "hi", "lo", MIN_VERSION, 1).toImmutable(), + new TestZkWorker(testTask, "h1", "n1", INVALID_VERSION).toImmutable(), // Invalid version node + new TestZkWorker(testTask, "h2", "n1", INVALID_VERSION).toImmutable() // Invalid version node + ) + ).times(2); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.replay(runner); + + boolean provisionedSomething = strategy.doProvision(runner); + + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 1); + DateTime createdTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + + Thread.sleep(2000); + + provisionedSomething = strategy.doProvision(runner); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = strategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(emitter); + EasyMock.verify(runner); + } + + @Test + public void testDoSuccessfulTerminate() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()); + EasyMock.expect(autoScaler.terminate(EasyMock.>anyObject())).andReturn( + new AutoScalingData(Lists.newArrayList()) + ); + EasyMock.replay(autoScaler); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTasks()).andReturn( + Arrays.asList( + new RemoteTaskRunnerWorkItem( + testTask.getId(), + null, + TaskLocation.unknown() + ).withQueueInsertionTime(new DateTime()) + ) + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable() + ) + ).times(2); + EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + Arrays.asList( + new TestZkWorker(testTask).getWorker() + ) + ); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.replay(runner); + + boolean terminatedSomething = strategy.doTerminate(runner); + + Assert.assertTrue(terminatedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + ); + + EasyMock.verify(autoScaler); + } + + @Test + public void testSomethingTerminating() throws Exception + { + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")).times(2); + EasyMock.expect(autoScaler.terminate(EasyMock.>anyObject())).andReturn( + new AutoScalingData(Lists.newArrayList("ip")) + ); + EasyMock.replay(autoScaler); + + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable() + ) + ).times(2); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable().getWorker() + ) + ); + EasyMock.replay(runner); + + boolean terminatedSomething = strategy.doTerminate(runner); + + Assert.assertTrue(terminatedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + ); + + terminatedSomething = strategy.doTerminate(runner); + + Assert.assertFalse(terminatedSomething); + Assert.assertTrue(strategy.getStats().toList().size() == 1); + Assert.assertTrue( + strategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testNoActionNeeded() throws Exception + { + EasyMock.reset(autoScaler); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScaler); + + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Arrays.asList( + (Task) NoopTask.create() + ) + ).times(1); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(NoopTask.create()).toImmutable(), + new TestZkWorker(NoopTask.create()).toImmutable() + ) + ).times(2); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.emptyList() + ); + EasyMock.replay(runner); + + boolean terminatedSomething = strategy.doTerminate(runner); + + Assert.assertFalse(terminatedSomething); + EasyMock.verify(autoScaler); + + EasyMock.reset(autoScaler); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScaler); + + boolean provisionedSomething = strategy.doProvision(runner); + + Assert.assertFalse(provisionedSomething); + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testMinCountIncrease() throws Exception + { + // Don't terminate anything + EasyMock.reset(autoScaler); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScaler); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Arrays.asList() + ).times(2); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(NoopTask.create(), "h1", "i1", MIN_VERSION).toImmutable() + ) + ).times(3); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(2); + + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.emptyList() + ); + EasyMock.replay(runner); + + boolean terminatedSomething = strategy.doTerminate( + runner + ); + Assert.assertFalse(terminatedSomething); + EasyMock.verify(autoScaler); + + // Don't provision anything + EasyMock.reset(autoScaler); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScaler); + boolean provisionedSomething = strategy.doProvision( + runner + ); + Assert.assertFalse(provisionedSomething); + EasyMock.verify(autoScaler); + + EasyMock.reset(autoScaler); + // Increase minNumWorkers + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("h3")) + ); + // Should provision two new workers + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("h4")) + ); + EasyMock.replay(autoScaler); + provisionedSomething = strategy.doProvision( + runner + ); + Assert.assertTrue(provisionedSomething); + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testNullWorkerConfig() throws Exception + { + workerConfig.set(null); + EasyMock.replay(autoScaler); + + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + Arrays.asList( + NoopTask.create() + ) + ).times(1); + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(null).toImmutable() + ) + ).times(2); + EasyMock.replay(runner); + + boolean terminatedSomething = strategy.doTerminate( + runner + ); + + boolean provisionedSomething = strategy.doProvision( + runner + ); + + Assert.assertFalse(terminatedSomething); + Assert.assertFalse(provisionedSomething); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + private static class TestZkWorker extends ZkWorker + { + private final Task testTask; + + public TestZkWorker( + Task testTask + ) + { + this(testTask, "host", "ip", MIN_VERSION); + } + + public TestZkWorker( + Task testTask, + String host, + String ip, + String version + ) + { + this(testTask, host, ip, version, 1); + } + + public TestZkWorker( + Task testTask, + String host, + String ip, + String version, + int capacity + ) + { + super(new Worker(host, ip, capacity, version), null, new DefaultObjectMapper()); + + this.testTask = testTask; + } + + @Override + public Map getRunningTasks() + { + if (testTask == null) { + return Maps.newHashMap(); + } + return ImmutableMap.of( + testTask.getId(), + TaskAnnouncement.create( + testTask, + TaskStatus.running(testTask.getId()), + TaskLocation.unknown() + ) + ); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index ce681f84e80..bded8398209 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -62,7 +62,7 @@ public class SimpleResourceManagementStrategyTest { private AutoScaler autoScaler; private Task testTask; - private SimpleResourceManagementStrategy simpleResourceManagementStrategy; + private SimpleWorkerResourceManagementStrategy simpleResourceManagementStrategy; private AtomicReference workerConfig; private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); @@ -72,7 +72,7 @@ public class SimpleResourceManagementStrategyTest autoScaler = EasyMock.createMock(AutoScaler.class); testTask = TestTasks.immediateSuccess("task1"); - final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig() + final SimpleWorkerResourceManagementConfig simpleWorkerResourceManagementConfig = new SimpleWorkerResourceManagementConfig() .setWorkerIdleTimeout(new Period(0)) .setMaxScalingDuration(new Period(1000)) .setNumEventsToTrack(1) @@ -88,8 +88,8 @@ public class SimpleResourceManagementStrategyTest ) ); - simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( - simpleResourceManagementConfig, + simpleResourceManagementStrategy = new SimpleWorkerResourceManagementStrategy( + simpleWorkerResourceManagementConfig, DSuppliers.of(workerConfig), schedulerConfig, executorService diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index ae049e8a68b..3ba425237a7 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.metamx.common.ISE; -import com.metamx.common.guava.Comparators; import com.metamx.emitter.EmittingLogger; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentChangeRequestDrop; @@ -39,7 +38,6 @@ import org.apache.zookeeper.data.Stat; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; @@ -69,7 +67,7 @@ public class LoadQueuePeon private final CuratorFramework curator; private final String basePath; private final ObjectMapper jsonMapper; - private final ScheduledExecutorService zkWritingExecutor; + private final ScheduledExecutorService processingExecutor; private final ExecutorService callBackExecutor; private final DruidCoordinatorConfig config; @@ -92,7 +90,7 @@ public class LoadQueuePeon CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ScheduledExecutorService zkWritingExecutor, + ScheduledExecutorService processingExecutor, ExecutorService callbackExecutor, DruidCoordinatorConfig config ) @@ -101,7 +99,7 @@ public class LoadQueuePeon this.basePath = basePath; this.jsonMapper = jsonMapper; this.callBackExecutor = callbackExecutor; - this.zkWritingExecutor = zkWritingExecutor; + this.processingExecutor = processingExecutor; this.config = config; } @@ -202,7 +200,7 @@ public class LoadQueuePeon return; } - zkWritingExecutor.execute( + processingExecutor.execute( new Runnable() { @Override @@ -225,7 +223,7 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - zkWritingExecutor.schedule( + processingExecutor.schedule( new Runnable() { @Override diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 1ce74669b56..41efbdc23f5 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -61,10 +61,12 @@ import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.WorkerTaskRunner; +import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; -import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordResource; @@ -144,8 +146,8 @@ public class CliOverlord extends ServerRunnable binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); configureTaskStorage(binder); - configureRunners(binder); configureAutoscale(binder); + configureRunners(binder); binder.bind(AuditManager.class) .toProvider(AuditManagerProvider.class) @@ -207,11 +209,26 @@ public class CliOverlord extends ServerRunnable private void configureAutoscale(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); - binder.bind(new TypeLiteral>(){}) - .to(SimpleResourceManagementStrategy.class) - .in(LazySingleton.class); + JsonConfigProvider.bind( + binder, + "druid.indexer.autoscale", + PendingTaskBasedWorkerResourceManagementConfig.class + ); + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleWorkerResourceManagementConfig.class); + + PolyBind.createChoice( + binder, + "druid.indexer.autoscale.strategy.type", + Key.get(ResourceManagementStrategy.class), + Key.get(SimpleWorkerResourceManagementStrategy.class) + ); + final MapBinder biddy = PolyBind.optionBinder( + binder, + Key.get(ResourceManagementStrategy.class) + ); + biddy.addBinding("simple").to(SimpleWorkerResourceManagementStrategy.class); + biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class); - JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } }, new IndexingServiceFirehoseModule(),