diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 6a2042bb291..ff69dbe7b2f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -52,6 +52,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.tasklogs.LogUtils; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.query.DruidMetrics; @@ -542,9 +543,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } @Override - public Collection getWorkers() + public Optional getScalingStats() { - return ImmutableList.of(); + return Optional.absent(); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java index 908d94a7710..aa9161638db 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunnerFactory.java @@ -31,8 +31,8 @@ import io.druid.tasklogs.TaskLogPusher; import java.util.Properties; /** -*/ -public class ForkingTaskRunnerFactory implements TaskRunnerFactory + */ +public class ForkingTaskRunnerFactory implements TaskRunnerFactory { private final ForkingTaskRunnerConfig config; private final TaskConfig taskConfig; @@ -51,7 +51,8 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory final ObjectMapper jsonMapper, final TaskLogPusher persistentTaskLogs, @Self DruidNode node - ) { + ) + { this.config = config; this.taskConfig = taskConfig; this.workerConfig = workerConfig; @@ -62,7 +63,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory } @Override - public TaskRunner build() + public ForkingTaskRunner build() { return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index c88878f358e..90cebd775f0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -52,9 +52,12 @@ import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.concurrent.Execs; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; @@ -102,7 +105,6 @@ import java.util.concurrent.TimeUnit; *

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * fail. The RemoteTaskRunner depends on another component to create additional worker resources. - * For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties. *

* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the * worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up. @@ -149,6 +151,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); + private final ResourceManagementStrategy resourceManagement; public RemoteTaskRunner( ObjectMapper jsonMapper, @@ -158,7 +161,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer PathChildrenCacheFactory pathChildrenCacheFactory, HttpClient httpClient, Supplier workerConfigRef, - ScheduledExecutorService cleanupExec + ScheduledExecutorService cleanupExec, + ResourceManagementStrategy resourceManagement ) { this.jsonMapper = jsonMapper; @@ -171,6 +175,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); + this.resourceManagement = resourceManagement; } @LifecycleStart @@ -283,6 +288,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer waitingForMonitor.wait(); } } + resourceManagement.startManagement(this); started = true; } catch (Exception e) { @@ -298,6 +304,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return; } started = false; + + resourceManagement.stopManagement(); + for (ZkWorker zkWorker : zkWorkers.values()) { zkWorker.close(); } @@ -314,7 +323,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return ImmutableList.of(); } - @Override public Collection getWorkers() { return ImmutableList.copyOf(zkWorkers.values()); @@ -339,6 +347,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); } + @Override + public Optional getScalingStats() + { + return Optional.of(resourceManagement.getStats()); + } + public ZkWorker findWorkerRunningTask(String taskId) { for (ZkWorker zkWorker : zkWorkers.values()) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 52d68030b8b..9f94635f0b3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -23,9 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.guice.annotations.Global; +import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ScalingStats; +import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; +import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -35,8 +42,9 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class RemoteTaskRunnerFactory implements TaskRunnerFactory +public class RemoteTaskRunnerFactory implements TaskRunnerFactory { + private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class); private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final IndexerZkConfig zkPaths; @@ -44,6 +52,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory private final HttpClient httpClient; private final Supplier workerConfigRef; private final ScheduledExecutorService cleanupExec; + private final SimpleResourceManagementConfig config; + private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; + private final ScheduledExecutorService exec; @Inject public RemoteTaskRunnerFactory( @@ -53,7 +64,10 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory final ObjectMapper jsonMapper, @Global final HttpClient httpClient, final Supplier workerConfigRef, - ScheduledExecutorFactory factory + final ScheduledExecutorFactory factory, + final SimpleResourceManagementConfig config, + final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + final ScheduledExecutorService exec ) { this.curator = curator; @@ -62,12 +76,26 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; - this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d"); + this.cleanupExec = factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"); + this.config = config; + this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; + this.exec = exec; } @Override - public TaskRunner build() + public RemoteTaskRunner build() { + final ResourceManagementStrategy resourceManagementStrategy; + if (resourceManagementSchedulerConfig.isDoAutoscale()) { + resourceManagementStrategy = new SimpleResourceManagementStrategy( + config, + workerConfigRef, + resourceManagementSchedulerConfig, + exec + ); + } else { + resourceManagementStrategy = new NoopResourceManagementStrategy<>(); + } return new RemoteTaskRunner( jsonMapper, remoteTaskRunnerConfig, @@ -79,7 +107,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory .build(), httpClient, workerConfigRef, - cleanupExec + cleanupExec, + resourceManagementStrategy ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 96f90a610ad..a9792200d95 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -22,8 +22,6 @@ package io.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -34,8 +32,7 @@ import io.druid.guice.annotations.Self; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.server.DruidNode; import io.druid.server.initialization.IndexerZkConfig; @@ -64,7 +61,6 @@ public class TaskMaster private volatile boolean leading = false; private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; - private volatile ResourceManagementScheduler resourceManagementScheduler; private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); @@ -77,7 +73,6 @@ public class TaskMaster @Self final DruidNode node, final IndexerZkConfig zkPaths, final TaskRunnerFactory runnerFactory, - final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, final ServiceEmitter emitter @@ -118,14 +113,6 @@ public class TaskMaster .emit(); } leaderLifecycle.addManagedInstance(taskRunner); - if (taskRunner instanceof RemoteTaskRunner) { - final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle); - resourceManagementScheduler = managementSchedulerFactory.build( - (RemoteTaskRunner) taskRunner, - executorFactory - ); - leaderLifecycle.addManagedInstance(resourceManagementScheduler); - } leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addHandler( new Lifecycle.Handler() @@ -285,10 +272,10 @@ public class TaskMaster } } - public Optional getResourceManagementScheduler() + public Optional getScalingStats() { if (leading) { - return Optional.fromNullable(resourceManagementScheduler); + return taskRunner.getScalingStats(); } else { return Optional.absent(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java index 28e07e91807..8802c2cd297 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java @@ -19,10 +19,12 @@ package io.druid.indexing.overlord; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.Pair; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import java.util.Collection; import java.util.List; @@ -45,7 +47,7 @@ public interface TaskRunner * * @return task status, eventually */ - public ListenableFuture run(Task task); + ListenableFuture run(Task task); /** * Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any @@ -53,19 +55,24 @@ public interface TaskRunner * * @param taskid task ID to clean up resources for */ - public void shutdown(String taskid); + void shutdown(String taskid); /** * Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling * stopping, "run" will not accept further tasks. */ - public void stop(); + void stop(); - public Collection getRunningTasks(); + Collection getRunningTasks(); - public Collection getPendingTasks(); + Collection getPendingTasks(); - public Collection getKnownTasks(); + Collection getKnownTasks(); - public Collection getWorkers(); + /** + * Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities + * + * @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise + */ + Optional getScalingStats(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerFactory.java index 68120b17c67..20625d16a01 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerFactory.java @@ -19,7 +19,7 @@ package io.druid.indexing.overlord; -public interface TaskRunnerFactory +public interface TaskRunnerFactory { - public TaskRunner build(); + T build(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 92a5843217b..da855f16891 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -19,11 +19,11 @@ package io.druid.indexing.overlord; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -33,7 +33,6 @@ import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.concurrent.Execs; @@ -42,6 +41,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -204,9 +204,9 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker } @Override - public Collection getWorkers() + public Optional getScalingStats() { - return Lists.newArrayList(); + return Optional.absent(); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AutoScaler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AutoScaler.java index 5b571b4a756..ca1d170f8f0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AutoScaler.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/AutoScaler.java @@ -34,17 +34,17 @@ import java.util.List; }) public interface AutoScaler { - public int getMinNumWorkers(); + int getMinNumWorkers(); - public int getMaxNumWorkers(); + int getMaxNumWorkers(); - public T getEnvConfig(); + T getEnvConfig(); - public AutoScalingData provision(); + AutoScalingData provision(); - public AutoScalingData terminate(List ips); + AutoScalingData terminate(List ips); - public AutoScalingData terminateWithIds(List ids); + AutoScalingData terminateWithIds(List ids); /** * Provides a lookup of ip addresses to node ids @@ -53,7 +53,7 @@ public interface AutoScaler * * @return node ids */ - public List ipToIdLookup(List ips); + List ipToIdLookup(List ips); /** * Provides a lookup of node ids to ip addresses @@ -62,5 +62,5 @@ public interface AutoScaler * * @return IPs associated with the node */ - public List idToIpLookup(List nodeIds); + List idToIpLookup(List nodeIds); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementScheduler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementScheduler.java deleted file mode 100644 index 4e55bba41b1..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementScheduler.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord.autoscaling; - -import com.metamx.common.logger.Logger; - -/** - */ -public class NoopResourceManagementScheduler extends ResourceManagementScheduler -{ - private static final Logger log = new Logger(NoopResourceManagementScheduler.class); - - public NoopResourceManagementScheduler() - { - super(null, null, null, null); - } - - @Override - public void start() - { - log.info("Autoscaling is disabled."); - } - - @Override - public void stop() - { - // do nothing - } - - @Override - public ScalingStats getStats() - { - return new ScalingStats(0); - } -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java new file mode 100644 index 00000000000..a96825e7949 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/NoopResourceManagementStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.autoscaling; + +import io.druid.indexing.overlord.TaskRunner; + +public class NoopResourceManagementStrategy implements ResourceManagementStrategy +{ + @Override + public void startManagement(T runner) + { + + } + + @Override + public void stopManagement() + { + + } + + @Override + public ScalingStats getStats() + { + return null; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java deleted file mode 100644 index 5346873ff26..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementScheduler.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord.autoscaling; - -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; -import io.druid.granularity.PeriodGranularity; -import io.druid.indexing.overlord.RemoteTaskRunner; -import org.joda.time.DateTime; -import org.joda.time.Duration; -import org.joda.time.Period; - -import java.util.concurrent.ScheduledExecutorService; - -/** - * The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed. - * The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually - * occur. That decision is made in the {@link ResourceManagementStrategy}. - */ -public class ResourceManagementScheduler -{ - private static final Logger log = new Logger(ResourceManagementScheduler.class); - - private final RemoteTaskRunner taskRunner; - private final ResourceManagementStrategy resourceManagementStrategy; - private final ResourceManagementSchedulerConfig config; - private final ScheduledExecutorService exec; - - private final Object lock = new Object(); - private volatile boolean started = false; - - public ResourceManagementScheduler( - RemoteTaskRunner taskRunner, - ResourceManagementStrategy resourceManagementStrategy, - ResourceManagementSchedulerConfig config, - ScheduledExecutorService exec - ) - { - this.taskRunner = taskRunner; - this.resourceManagementStrategy = resourceManagementStrategy; - this.config = config; - this.exec = exec; - } - - @LifecycleStart - public void start() - { - synchronized (lock) { - if (started) { - return; - } - - log.info("Started Resource Management Scheduler"); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - config.getProvisionPeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - resourceManagementStrategy.doProvision(taskRunner); - } - } - ); - - // Schedule termination of worker nodes periodically - Period period = config.getTerminatePeriod(); - PeriodGranularity granularity = new PeriodGranularity(period, config.getOriginTime(), null); - final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); - - ScheduledExecutors.scheduleAtFixedRate( - exec, - new Duration(System.currentTimeMillis(), startTime), - config.getTerminatePeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - resourceManagementStrategy.doTerminate(taskRunner); - } - } - ); - - started = true; - } - } - - @LifecycleStop - public void stop() - { - synchronized (lock) { - if (!started) { - return; - } - log.info("Stopping Resource Management Scheduler"); - exec.shutdown(); - started = false; - } - } - - public ScalingStats getStats() - { - return resourceManagementStrategy.getStats(); - } -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactory.java deleted file mode 100644 index c4586cd37b7..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord.autoscaling; - -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import io.druid.indexing.overlord.RemoteTaskRunner; - -/** - */ -public interface ResourceManagementSchedulerFactory -{ - public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory); -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactoryImpl.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactoryImpl.java deleted file mode 100644 index 33eb768f1b9..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementSchedulerFactoryImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord.autoscaling; - -import com.google.inject.Inject; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import io.druid.indexing.overlord.RemoteTaskRunner; - -/** - */ -public class ResourceManagementSchedulerFactoryImpl implements ResourceManagementSchedulerFactory -{ - private final ResourceManagementSchedulerConfig config; - private final ResourceManagementStrategy strategy; - - @Inject - public ResourceManagementSchedulerFactoryImpl( - ResourceManagementStrategy strategy, - ResourceManagementSchedulerConfig config, - ScheduledExecutorFactory executorFactory - ) - { - this.config = config; - this.strategy = strategy; - } - - @Override - public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory) - { - if (config.isDoAutoscale()) { - return new ResourceManagementScheduler(runner, strategy, config, executorFactory.create(1, "ScalingExec--%d")); - } - else { - return new NoopResourceManagementScheduler(); - } - } -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java index 7c83c86ed72..bdf5dc42739 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ResourceManagementStrategy.java @@ -19,17 +19,33 @@ package io.druid.indexing.overlord.autoscaling; -import io.druid.indexing.overlord.RemoteTaskRunner; +import io.druid.indexing.overlord.TaskRunner; /** * The ResourceManagementStrategy decides if worker nodes should be provisioned or determined * based on the available tasks in the system and the state of the workers in the system. + * In general, the resource management is tied to the runner. */ -public interface ResourceManagementStrategy +public interface ResourceManagementStrategy { - public boolean doProvision(RemoteTaskRunner runner); + /** + * Equivalent to start() but requires a specific runner instance which holds state of interest. + * This method is intended to be called from the TaskRunner's lifecycle + * + * @param runner The TaskRunner state holder this strategy should use during execution + */ + void startManagement(T runner); - public boolean doTerminate(RemoteTaskRunner runner); + /** + * Equivalent to stop() + * Should be called from TaskRunner's lifecycle + */ + void stopManagement(); - public ScalingStats getStats(); + /** + * Get any interesting stats related to scaling + * + * @return The ScalingStats or `null` if nothing of interest + */ + ScalingStats getStats(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ScalingStats.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ScalingStats.java index ad93392ce0d..b68482ccce9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ScalingStats.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/ScalingStats.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; import org.joda.time.DateTime; import java.util.Collections; @@ -33,20 +34,20 @@ import java.util.List; */ public class ScalingStats { - public static enum EVENT + public enum EVENT { PROVISION, TERMINATE } - private static final Comparator comparator = new Comparator() + private static final Comparator COMPARATOR = new Ordering() { @Override public int compare(ScalingEvent s1, ScalingEvent s2) { - return -s1.getTimestamp().compareTo(s2.getTimestamp()); + return s2.getTimestamp().compareTo(s1.getTimestamp()); } - }; + }.nullsLast(); private final Object lock = new Object(); @@ -55,10 +56,10 @@ public class ScalingStats public ScalingStats(int capacity) { if (capacity == 0) { - this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create(); + this.recentEvents = MinMaxPriorityQueue.orderedBy(COMPARATOR).create(); } else { this.recentEvents = MinMaxPriorityQueue - .orderedBy(comparator) + .orderedBy(COMPARATOR) .maximumSize(capacity) .create(); } @@ -95,7 +96,7 @@ public class ScalingStats { synchronized (lock) { List retVal = Lists.newArrayList(recentEvents); - Collections.sort(retVal, comparator); + Collections.sort(retVal, COMPARATOR); return retVal; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java index a0bd859baf5..4d8b13f794d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementConfig.java @@ -99,6 +99,7 @@ public class SimpleResourceManagementConfig return this; } + // Do not use this if possible. Assuming all workers will have the same port is bad for containers. public int getWorkerPort() { return workerPort; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java index 2ef95032fcc..120f5f3faf2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java @@ -29,25 +29,33 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; +import io.druid.granularity.PeriodGranularity; import io.druid.indexing.overlord.RemoteTaskRunner; -import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.joda.time.Period; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; /** */ -public class SimpleResourceManagementStrategy implements ResourceManagementStrategy +public class SimpleResourceManagementStrategy implements ResourceManagementStrategy { private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); + private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; + private final ScheduledExecutorService exec; + + private volatile boolean started = false; + private final SimpleResourceManagementConfig config; private final Supplier workerConfigRef; private final ScalingStats scalingStats; @@ -63,19 +71,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat @Inject public SimpleResourceManagementStrategy( SimpleResourceManagementConfig config, - Supplier workerConfigRef + Supplier workerConfigRef, + ResourceManagementSchedulerConfig resourceManagementSchedulerConfig, + ScheduledExecutorService exec ) { this.config = config; this.workerConfigRef = workerConfigRef; this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); + this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; + this.exec = exec; } - @Override - public boolean doProvision(RemoteTaskRunner runner) + boolean doProvision(RemoteTaskRunner runner) { - Collection pendingTasks = runner.getPendingTasks(); - Collection zkWorkers = runner.getWorkers(); + Collection pendingTasks = runner.getPendingTasks(); + Collection zkWorkers = getWorkers(runner); synchronized (lock) { boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); @@ -140,10 +151,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - @Override - public boolean doTerminate(RemoteTaskRunner runner) + boolean doTerminate(RemoteTaskRunner runner) { - Collection pendingTasks = runner.getPendingTasks(); + Collection pendingTasks = runner.getPendingTasks(); synchronized (lock) { final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); if (workerConfig == null) { @@ -179,7 +189,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat currentlyTerminating.clear(); currentlyTerminating.addAll(stillExisting); - Collection workers = runner.getWorkers(); + Collection workers = getWorkers(runner); updateTargetWorkerCount(workerConfig, pendingTasks, workers); if (currentlyTerminating.isEmpty()) { @@ -237,6 +247,70 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } + @Override + public void startManagement(final RemoteTaskRunner runner) + { + synchronized (lock) { + if (started) { + return; + } + + log.info("Started Resource Management Scheduler"); + + ScheduledExecutors.scheduleAtFixedRate( + exec, + resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + doProvision(runner); + } + } + ); + + // Schedule termination of worker nodes periodically + Period period = resourceManagementSchedulerConfig.getTerminatePeriod(); + PeriodGranularity granularity = new PeriodGranularity( + period, + resourceManagementSchedulerConfig.getOriginTime(), + null + ); + final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); + + ScheduledExecutors.scheduleAtFixedRate( + exec, + new Duration(System.currentTimeMillis(), startTime), + resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + doTerminate(runner); + } + } + ); + + started = true; + + } + } + + @Override + public void stopManagement() + { + synchronized (lock) { + if (!started) { + return; + } + log.info("Stopping Resource Management Scheduler"); + exec.shutdown(); + started = false; + } + } + @Override public ScalingStats getStats() { @@ -281,7 +355,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private void updateTargetWorkerCount( final WorkerBehaviorConfig workerConfig, - final Collection pendingTasks, + final Collection pendingTasks, final Collection zkWorkers ) { @@ -358,7 +432,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) + private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) { synchronized (lock) { long now = System.currentTimeMillis(); @@ -372,4 +446,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return false; } } + + public Collection getWorkers(RemoteTaskRunner runner) + { + return runner.getWorkers(); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 3e6d4727ce9..bc42e30785a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -39,12 +39,13 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorageQueryAdapter; -import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.metadata.EntryExistsException; import io.druid.tasklogs.TaskLogStreamer; @@ -409,7 +410,18 @@ public class OverlordResource @Override public Response apply(TaskRunner taskRunner) { - return Response.ok(taskRunner.getWorkers()).build(); + if (taskRunner instanceof RemoteTaskRunner) { + return Response.ok(((RemoteTaskRunner) taskRunner).getWorkers()).build(); + } else { + log.debug( + "Task runner [%s] of type [%s] does not support listing workers", + taskRunner, + taskRunner.getClass().getCanonicalName() + ); + return Response.serverError() + .entity(ImmutableMap.of("error", "Task Runner does not support worker listing")) + .build(); + } } } ); @@ -421,9 +433,9 @@ public class OverlordResource public Response getScalingState() { // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler. - final Optional rms = taskMaster.getResourceManagementScheduler(); + final Optional rms = taskMaster.getScalingStats(); if (rms.isPresent()) { - return Response.ok(rms.get().getStats()).build(); + return Response.ok(rms.get()).build(); } else { return Response.ok().build(); } @@ -563,7 +575,7 @@ public class OverlordResource } if (status.isPresent()) { data.put("statusCode", status.get().getStatusCode().toString()); - if(status.get().isComplete()) { + if (status.get().isComplete()) { data.put("duration", status.get().getDuration()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 5ed425f5df9..f6570f68b37 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -48,7 +48,7 @@ public interface WorkerSelectStrategy * * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. */ - public Optional findWorkerForTask( + Optional findWorkerForTask( final RemoteTaskRunnerConfig config, final ImmutableMap zkWorkers, final Task task diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 576807fd97c..8a4d007564d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -41,6 +41,9 @@ import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; +import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.TaskAnnouncement; @@ -471,7 +474,8 @@ public class RemoteTaskRunnerTest new SimplePathChildrenCacheFactory.Builder().build(), null, DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), - ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d") + ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), + new NoopResourceManagementStrategy() ); remoteTaskRunner.start(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index ecef62fe508..e6bbe776abc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -27,6 +27,7 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.common.guava.DSuppliers; +import io.druid.concurrent.Execs; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.task.NoopTask; @@ -45,6 +46,7 @@ import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -53,6 +55,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; /** @@ -61,9 +64,9 @@ public class SimpleResourceManagementStrategyTest { private AutoScaler autoScaler; private Task testTask; - private SimpleResourceManagementConfig simpleResourceManagementConfig; private SimpleResourceManagementStrategy simpleResourceManagementStrategy; private AtomicReference workerConfig; + private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); @Before public void setUp() throws Exception @@ -75,7 +78,7 @@ public class SimpleResourceManagementStrategyTest testTask = new TestMergeTask( "task1", "dummyDs", - Lists.newArrayList( + Lists.newArrayList( new DataSegment( "dummyDs", new Interval("2012-01-01/2012-01-02"), @@ -92,13 +95,15 @@ public class SimpleResourceManagementStrategyTest indexSpec ); - simpleResourceManagementConfig = new SimpleResourceManagementConfig() + final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig() .setWorkerIdleTimeout(new Period(0)) .setMaxScalingDuration(new Period(1000)) .setNumEventsToTrack(1) .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(""); + final ResourceManagementSchedulerConfig schedulerConfig = new ResourceManagementSchedulerConfig(); + workerConfig = new AtomicReference<>( new WorkerBehaviorConfig( null, @@ -108,10 +113,18 @@ public class SimpleResourceManagementStrategyTest simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( simpleResourceManagementConfig, - DSuppliers.of(workerConfig) + DSuppliers.of(workerConfig), + schedulerConfig, + executorService ); } + @After + public void tearDown() + { + executorService.shutdownNow(); + } + @Test public void testSuccessfulProvision() throws Exception { @@ -120,16 +133,16 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()); EasyMock.expect(autoScaler.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("aNode")) + new AutoScalingData(Lists.newArrayList("aNode")) ); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(testTask) ) ); @@ -156,16 +169,16 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()).times(2); EasyMock.expect(autoScaler.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("fake")) + new AutoScalingData(Lists.newArrayList("fake")) ); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(testTask) ) ).times(2); @@ -212,17 +225,17 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.terminateWithIds(EasyMock.>anyObject())) .andReturn(null); EasyMock.expect(autoScaler.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("fake")) + new AutoScalingData(Lists.newArrayList("fake")) ); EasyMock.replay(autoScaler); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(testTask) ) ).times(2); @@ -268,17 +281,17 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScaler); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(testTask) ) ).times(2); - EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( - Arrays.asList( + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.singletonList( new TestZkWorker(testTask) ) ); @@ -302,26 +315,26 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")).times(2); + .andReturn(Lists.newArrayList("ip")).times(2); EasyMock.expect(autoScaler.terminate(EasyMock.>anyObject())).andReturn( - new AutoScalingData(Lists.newArrayList("ip")) + new AutoScalingData(Lists.newArrayList("ip")) ); EasyMock.replay(autoScaler); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(testTask) ) ).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); - EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( - Arrays.asList( + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.singletonList( new TestZkWorker(testTask) ) ); @@ -354,12 +367,12 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); + .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); @@ -370,7 +383,7 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( Collections.emptyList() ); EasyMock.replay(runner); @@ -384,7 +397,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); + .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); @@ -402,25 +415,25 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); + .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList() + Collections.emptyList() ).times(3); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(NoopTask.create(), "h1", "i1", "0") ) ).times(3); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy((Predicate) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( Collections.emptyList() ); EasyMock.replay(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - runner + runner ); Assert.assertFalse(terminatedSomething); EasyMock.verify(autoScaler); @@ -430,7 +443,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); + .andReturn(Lists.newArrayList("ip")); EasyMock.replay(autoScaler); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( runner @@ -443,17 +456,17 @@ public class SimpleResourceManagementStrategyTest EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); + .andReturn(Lists.newArrayList("ip")); EasyMock.expect(autoScaler.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h3")) + new AutoScalingData(Lists.newArrayList("h3")) ); // Should provision two new workers EasyMock.expect(autoScaler.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h4")) + new AutoScalingData(Lists.newArrayList("h4")) ); EasyMock.replay(autoScaler); provisionedSomething = simpleResourceManagementStrategy.doProvision( - runner + runner ); Assert.assertTrue(provisionedSomething); EasyMock.verify(autoScaler); @@ -468,12 +481,12 @@ public class SimpleResourceManagementStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getPendingTasks()).andReturn( - Arrays.asList( + Collections.singletonList( new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( + Collections.singletonList( new TestZkWorker(null) ) ).times(1); @@ -484,7 +497,7 @@ public class SimpleResourceManagementStrategyTest ); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - runner + runner ); Assert.assertFalse(terminatedSomething); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index f18f6ebe828..61f6c34a4b6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -20,13 +20,13 @@ package io.druid.indexing.overlord.http; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.guava.CloseQuietly; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -39,7 +39,6 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.HeapMemoryTaskStorage; -import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskRunner; @@ -47,10 +46,7 @@ import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; -import io.druid.indexing.overlord.ZkWorker; -import io.druid.indexing.overlord.autoscaling.NoopResourceManagementScheduler; -import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory; +import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.server.DruidNode; import io.druid.server.initialization.IndexerZkConfig; @@ -156,24 +152,14 @@ public class OverlordResourceTest taskActionClientFactory, druidNode, indexerZkConfig, - new TaskRunnerFactory() + new TaskRunnerFactory() { @Override - public TaskRunner build() + public MockTaskRunner build() { return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); } }, - new ResourceManagementSchedulerFactory() - { - @Override - public ResourceManagementScheduler build( - RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory - ) - { - return new NoopResourceManagementScheduler(); - } - }, curator, new NoopServiceAnnouncer() { @@ -194,7 +180,7 @@ public class OverlordResourceTest // basic task master lifecycle test taskMaster.start(); announcementLatch.await(); - while(!taskMaster.isLeading()){ + while (!taskMaster.isLeading()) { // I believe the control will never reach here and thread will never sleep but just to be on safe side Thread.sleep(10); } @@ -264,7 +250,8 @@ public class OverlordResourceTest * These method will not timeout until the condition is met so calling method should ensure timeout * This method also assumes that the task with given taskId is present * */ - private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException { + private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException + { while (true) { Response response = overlordResource.getTaskStatus(taskId); if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) { @@ -312,28 +299,28 @@ public class OverlordResourceTest { final String taskId = task.getId(); ListenableFuture future = MoreExecutors.listeningDecorator( - Execs.singleThreaded( - "noop_test_task_exec_%s" - ) + Execs.singleThreaded( + "noop_test_task_exec_%s" + ) ).submit( - new Callable() - { - @Override - public TaskStatus call() throws Exception + new Callable() { - // adding of task to list of runningTasks should be done before count down as - // getRunningTasks may not include the task for which latch has been counted down - // Count down to let know that task is actually running - // this is equivalent of getting process holder to run task in ForkingTaskRunner - runningTasks.add(taskId); - runLatches[Integer.parseInt(taskId)].countDown(); - // Wait for completion count down - completionLatches[Integer.parseInt(taskId)].await(); - taskRunnerWorkItems.remove(taskId); - runningTasks.remove(taskId); - return TaskStatus.success(taskId); + @Override + public TaskStatus call() throws Exception + { + // adding of task to list of runningTasks should be done before count down as + // getRunningTasks may not include the task for which latch has been counted down + // Count down to let know that task is actually running + // this is equivalent of getting process holder to run task in ForkingTaskRunner + runningTasks.add(taskId); + runLatches[Integer.parseInt(taskId)].countDown(); + // Wait for completion count down + completionLatches[Integer.parseInt(taskId)].await(); + taskRunnerWorkItems.remove(taskId); + runningTasks.remove(taskId); + return TaskStatus.success(taskId); + } } - } ); TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future); taskRunnerWorkItems.put(taskId, taskRunnerWorkItem); @@ -347,16 +334,16 @@ public class OverlordResourceTest public synchronized Collection getRunningTasks() { List runningTaskList = Lists.transform( - runningTasks, - new Function() - { - @Nullable - @Override - public TaskRunnerWorkItem apply(String input) + runningTasks, + new Function() { - return taskRunnerWorkItems.get(input); + @Nullable + @Override + public TaskRunnerWorkItem apply(String input) + { + return taskRunnerWorkItems.get(input); + } } - } ); return runningTaskList; } @@ -374,9 +361,9 @@ public class OverlordResourceTest } @Override - public Collection getWorkers() + public Optional getScalingStats() { - return ImmutableList.of(); + return Optional.absent(); } } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 1dd11fb0d0a..551064aaafd 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -61,8 +61,6 @@ import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory; -import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactoryImpl; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy; @@ -141,9 +139,6 @@ public class CliOverlord extends ServerRunnable binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class); binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); - binder.bind(ResourceManagementSchedulerFactory.class) - .to(ResourceManagementSchedulerFactoryImpl.class) - .in(LazySingleton.class); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));