Merge pull request #1953 from metamx/taskRunnerResourceManagement

Move resource managemnt to be the responsibility of the TaskRunner
This commit is contained in:
Nishant 2016-01-20 22:27:47 +05:30
commit ac6c90e657
24 changed files with 367 additions and 440 deletions

View File

@ -52,6 +52,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils; import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig; import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
@ -542,9 +543,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
} }
@Override @Override
public Collection<ZkWorker> getWorkers() public Optional<ScalingStats> getScalingStats()
{ {
return ImmutableList.of(); return Optional.absent();
} }
@Override @Override

View File

@ -31,8 +31,8 @@ import io.druid.tasklogs.TaskLogPusher;
import java.util.Properties; import java.util.Properties;
/** /**
*/ */
public class ForkingTaskRunnerFactory implements TaskRunnerFactory public class ForkingTaskRunnerFactory implements TaskRunnerFactory<ForkingTaskRunner>
{ {
private final ForkingTaskRunnerConfig config; private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig; private final TaskConfig taskConfig;
@ -51,7 +51,8 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
final TaskLogPusher persistentTaskLogs, final TaskLogPusher persistentTaskLogs,
@Self DruidNode node @Self DruidNode node
) { )
{
this.config = config; this.config = config;
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
@ -62,7 +63,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
} }
@Override @Override
public TaskRunner build() public ForkingTaskRunner build()
{ {
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node); return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
} }

View File

@ -53,9 +53,12 @@ import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
@ -103,7 +106,6 @@ import java.util.concurrent.TimeUnit;
* <p/> * <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources. * fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/> * <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up. * worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
@ -150,6 +152,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec; private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;
public RemoteTaskRunner( public RemoteTaskRunner(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@ -159,7 +162,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
PathChildrenCacheFactory pathChildrenCacheFactory, PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient, HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef, Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -172,6 +176,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.httpClient = httpClient; this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
} }
@LifecycleStart @LifecycleStart
@ -284,6 +289,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
waitingForMonitor.wait(); waitingForMonitor.wait();
} }
} }
resourceManagement.startManagement(this);
started = true; started = true;
} }
catch (Exception e) { catch (Exception e) {
@ -299,6 +305,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return; return;
} }
started = false; started = false;
resourceManagement.stopManagement();
for (ZkWorker zkWorker : zkWorkers.values()) { for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close(); zkWorker.close();
} }
@ -315,7 +324,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.of(); return ImmutableList.of();
} }
@Override
public Collection<ZkWorker> getWorkers() public Collection<ZkWorker> getWorkers()
{ {
return ImmutableList.copyOf(zkWorkers.values()); return ImmutableList.copyOf(zkWorkers.values());
@ -340,6 +348,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
} }
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.of(resourceManagement.getStats());
}
public ZkWorker findWorkerRunningTask(String taskId) public ZkWorker findWorkerRunningTask(String taskId)
{ {
for (ZkWorker zkWorker : zkWorkers.values()) { for (ZkWorker zkWorker : zkWorkers.values()) {

View File

@ -23,9 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
@ -35,8 +42,9 @@ import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
public class RemoteTaskRunnerFactory implements TaskRunnerFactory public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{ {
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator; private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final IndexerZkConfig zkPaths; private final IndexerZkConfig zkPaths;
@ -44,6 +52,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final HttpClient httpClient; private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef; private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec; private final ScheduledExecutorService cleanupExec;
private final SimpleResourceManagementConfig config;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
@Inject @Inject
public RemoteTaskRunnerFactory( public RemoteTaskRunnerFactory(
@ -53,7 +64,10 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
@Global final HttpClient httpClient, @Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef, final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory final ScheduledExecutorFactory factory,
final SimpleResourceManagementConfig config,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ScheduledExecutorService exec
) )
{ {
this.curator = curator; this.curator = curator;
@ -62,12 +76,26 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.httpClient = httpClient; this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d"); this.cleanupExec = factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d");
this.config = config;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
} }
@Override @Override
public TaskRunner build() public RemoteTaskRunner build()
{ {
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
exec
);
} else {
resourceManagementStrategy = new NoopResourceManagementStrategy<>();
}
return new RemoteTaskRunner( return new RemoteTaskRunner(
jsonMapper, jsonMapper,
remoteTaskRunnerConfig, remoteTaskRunnerConfig,
@ -79,7 +107,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.build(), .build(),
httpClient, httpClient,
workerConfigRef, workerConfigRef,
cleanupExec cleanupExec,
resourceManagementStrategy
); );
} }
} }

View File

@ -22,8 +22,6 @@ package io.druid.indexing.overlord;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -34,8 +32,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
@ -64,7 +61,6 @@ public class TaskMaster
private volatile boolean leading = false; private volatile boolean leading = false;
private volatile TaskRunner taskRunner; private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue; private volatile TaskQueue taskQueue;
private volatile ResourceManagementScheduler resourceManagementScheduler;
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
@ -77,7 +73,6 @@ public class TaskMaster
@Self final DruidNode node, @Self final DruidNode node,
final IndexerZkConfig zkPaths, final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory, final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator, final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer, final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter final ServiceEmitter emitter
@ -118,14 +113,6 @@ public class TaskMaster
.emit(); .emit();
} }
leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskRunner);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler( leaderLifecycle.addHandler(
new Lifecycle.Handler() new Lifecycle.Handler()
@ -285,10 +272,10 @@ public class TaskMaster
} }
} }
public Optional<ResourceManagementScheduler> getResourceManagementScheduler() public Optional<ScalingStats> getScalingStats()
{ {
if (leading) { if (leading) {
return Optional.fromNullable(resourceManagementScheduler); return taskRunner.getScalingStats();
} else { } else {
return Optional.absent(); return Optional.absent();
} }

View File

@ -19,10 +19,12 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -45,7 +47,7 @@ public interface TaskRunner
* *
* @return task status, eventually * @return task status, eventually
*/ */
public ListenableFuture<TaskStatus> run(Task task); ListenableFuture<TaskStatus> run(Task task);
/** /**
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any * Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
@ -53,19 +55,24 @@ public interface TaskRunner
* *
* @param taskid task ID to clean up resources for * @param taskid task ID to clean up resources for
*/ */
public void shutdown(String taskid); void shutdown(String taskid);
/** /**
* Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling * Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
* stopping, "run" will not accept further tasks. * stopping, "run" will not accept further tasks.
*/ */
public void stop(); void stop();
public Collection<? extends TaskRunnerWorkItem> getRunningTasks(); Collection<? extends TaskRunnerWorkItem> getRunningTasks();
public Collection<? extends TaskRunnerWorkItem> getPendingTasks(); Collection<? extends TaskRunnerWorkItem> getPendingTasks();
public Collection<? extends TaskRunnerWorkItem> getKnownTasks(); Collection<? extends TaskRunnerWorkItem> getKnownTasks();
public Collection<ZkWorker> getWorkers(); /**
* Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
*
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();
} }

View File

@ -19,7 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
public interface TaskRunnerFactory public interface TaskRunnerFactory<T extends TaskRunner>
{ {
public TaskRunner build(); T build();
} }

View File

@ -19,11 +19,11 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -33,7 +33,6 @@ import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
@ -42,6 +41,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.query.NoopQueryRunner; import io.druid.query.NoopQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -204,9 +204,9 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
} }
@Override @Override
public Collection<ZkWorker> getWorkers() public Optional<ScalingStats> getScalingStats()
{ {
return Lists.newArrayList(); return Optional.absent();
} }
@Override @Override

View File

@ -34,17 +34,17 @@ import java.util.List;
}) })
public interface AutoScaler<T> public interface AutoScaler<T>
{ {
public int getMinNumWorkers(); int getMinNumWorkers();
public int getMaxNumWorkers(); int getMaxNumWorkers();
public T getEnvConfig(); T getEnvConfig();
public AutoScalingData provision(); AutoScalingData provision();
public AutoScalingData terminate(List<String> ips); AutoScalingData terminate(List<String> ips);
public AutoScalingData terminateWithIds(List<String> ids); AutoScalingData terminateWithIds(List<String> ids);
/** /**
* Provides a lookup of ip addresses to node ids * Provides a lookup of ip addresses to node ids
@ -53,7 +53,7 @@ public interface AutoScaler<T>
* *
* @return node ids * @return node ids
*/ */
public List<String> ipToIdLookup(List<String> ips); List<String> ipToIdLookup(List<String> ips);
/** /**
* Provides a lookup of node ids to ip addresses * Provides a lookup of node ids to ip addresses
@ -62,5 +62,5 @@ public interface AutoScaler<T>
* *
* @return IPs associated with the node * @return IPs associated with the node
*/ */
public List<String> idToIpLookup(List<String> nodeIds); List<String> idToIpLookup(List<String> nodeIds);
} }

View File

@ -1,52 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.logger.Logger;
/**
*/
public class NoopResourceManagementScheduler extends ResourceManagementScheduler
{
private static final Logger log = new Logger(NoopResourceManagementScheduler.class);
public NoopResourceManagementScheduler()
{
super(null, null, null, null);
}
@Override
public void start()
{
log.info("Autoscaling is disabled.");
}
@Override
public void stop()
{
// do nothing
}
@Override
public ScalingStats getStats()
{
return new ScalingStats(0);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.TaskRunner;
public class NoopResourceManagementStrategy<T extends TaskRunner> implements ResourceManagementStrategy<T>
{
@Override
public void startManagement(T runner)
{
}
@Override
public void stopManagement()
{
}
@Override
public ScalingStats getStats()
{
return null;
}
}

View File

@ -1,127 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
* occur. That decision is made in the {@link ResourceManagementStrategy}.
*/
public class ResourceManagementScheduler
{
private static final Logger log = new Logger(ResourceManagementScheduler.class);
private final RemoteTaskRunner taskRunner;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ResourceManagementSchedulerConfig config;
private final ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
public ResourceManagementScheduler(
RemoteTaskRunner taskRunner,
ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorService exec
)
{
this.taskRunner = taskRunner;
this.resourceManagementStrategy = resourceManagementStrategy;
this.config = config;
this.exec = exec;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doProvision(taskRunner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = config.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(period, config.getOriginTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
config.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doTerminate(taskRunner);
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
public ScalingStats getStats()
{
return resourceManagementStrategy.getStats();
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner;
/**
*/
public interface ResourceManagementSchedulerFactory
{
public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory);
}

View File

@ -1,54 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner;
/**
*/
public class ResourceManagementSchedulerFactoryImpl implements ResourceManagementSchedulerFactory
{
private final ResourceManagementSchedulerConfig config;
private final ResourceManagementStrategy strategy;
@Inject
public ResourceManagementSchedulerFactoryImpl(
ResourceManagementStrategy strategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorFactory executorFactory
)
{
this.config = config;
this.strategy = strategy;
}
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory)
{
if (config.isDoAutoscale()) {
return new ResourceManagementScheduler(runner, strategy, config, executorFactory.create(1, "ScalingExec--%d"));
}
else {
return new NoopResourceManagementScheduler();
}
}
}

View File

@ -19,17 +19,33 @@
package io.druid.indexing.overlord.autoscaling; package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.TaskRunner;
/** /**
* The ResourceManagementStrategy decides if worker nodes should be provisioned or determined * The ResourceManagementStrategy decides if worker nodes should be provisioned or determined
* based on the available tasks in the system and the state of the workers in the system. * based on the available tasks in the system and the state of the workers in the system.
* In general, the resource management is tied to the runner.
*/ */
public interface ResourceManagementStrategy public interface ResourceManagementStrategy<T extends TaskRunner>
{ {
public boolean doProvision(RemoteTaskRunner runner); /**
* Equivalent to start() but requires a specific runner instance which holds state of interest.
* This method is intended to be called from the TaskRunner's lifecycle
*
* @param runner The TaskRunner state holder this strategy should use during execution
*/
void startManagement(T runner);
public boolean doTerminate(RemoteTaskRunner runner); /**
* Equivalent to stop()
* Should be called from TaskRunner's lifecycle
*/
void stopManagement();
public ScalingStats getStats(); /**
* Get any interesting stats related to scaling
*
* @return The ScalingStats or `null` if nothing of interest
*/
ScalingStats getStats();
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.Collections; import java.util.Collections;
@ -33,20 +34,20 @@ import java.util.List;
*/ */
public class ScalingStats public class ScalingStats
{ {
public static enum EVENT public enum EVENT
{ {
PROVISION, PROVISION,
TERMINATE TERMINATE
} }
private static final Comparator<ScalingEvent> comparator = new Comparator<ScalingEvent>() private static final Comparator<ScalingEvent> COMPARATOR = new Ordering<ScalingEvent>()
{ {
@Override @Override
public int compare(ScalingEvent s1, ScalingEvent s2) public int compare(ScalingEvent s1, ScalingEvent s2)
{ {
return -s1.getTimestamp().compareTo(s2.getTimestamp()); return s2.getTimestamp().compareTo(s1.getTimestamp());
} }
}; }.nullsLast();
private final Object lock = new Object(); private final Object lock = new Object();
@ -55,10 +56,10 @@ public class ScalingStats
public ScalingStats(int capacity) public ScalingStats(int capacity)
{ {
if (capacity == 0) { if (capacity == 0) {
this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create(); this.recentEvents = MinMaxPriorityQueue.orderedBy(COMPARATOR).create();
} else { } else {
this.recentEvents = MinMaxPriorityQueue this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator) .orderedBy(COMPARATOR)
.maximumSize(capacity) .maximumSize(capacity)
.create(); .create();
} }
@ -95,7 +96,7 @@ public class ScalingStats
{ {
synchronized (lock) { synchronized (lock) {
List<ScalingEvent> retVal = Lists.newArrayList(recentEvents); List<ScalingEvent> retVal = Lists.newArrayList(recentEvents);
Collections.sort(retVal, comparator); Collections.sort(retVal, COMPARATOR);
return retVal; return retVal;
} }
} }

View File

@ -99,6 +99,7 @@ public class SimpleResourceManagementConfig
return this; return this;
} }
// Do not use this if possible. Assuming all workers will have the same port is bad for containers.
public int getWorkerPort() public int getWorkerPort()
{ {
return workerPort; return workerPort;

View File

@ -29,25 +29,33 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<RemoteTaskRunner>
{ {
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private volatile boolean started = false;
private final SimpleResourceManagementConfig config; private final SimpleResourceManagementConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef; private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats; private final ScalingStats scalingStats;
@ -63,19 +71,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Inject @Inject
public SimpleResourceManagementStrategy( public SimpleResourceManagementStrategy(
SimpleResourceManagementConfig config, SimpleResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
) )
{ {
this.config = config; this.config = config;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
} }
@Override boolean doProvision(RemoteTaskRunner runner)
public boolean doProvision(RemoteTaskRunner runner)
{ {
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks(); Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = runner.getWorkers(); Collection<ZkWorker> zkWorkers = getWorkers(runner);
synchronized (lock) { synchronized (lock) {
boolean didProvision = false; boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -140,10 +151,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
@Override boolean doTerminate(RemoteTaskRunner runner)
public boolean doTerminate(RemoteTaskRunner runner)
{ {
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks(); Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) { synchronized (lock) {
final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) { if (workerConfig == null) {
@ -179,7 +189,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear(); currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting); currentlyTerminating.addAll(stillExisting);
Collection<ZkWorker> workers = runner.getWorkers(); Collection<ZkWorker> workers = getWorkers(runner);
updateTargetWorkerCount(workerConfig, pendingTasks, workers); updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) { if (currentlyTerminating.isEmpty()) {
@ -237,6 +247,70 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
@Override
public void startManagement(final RemoteTaskRunner runner)
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doProvision(runner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = resourceManagementSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
resourceManagementSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doTerminate(runner);
}
}
);
started = true;
}
}
@Override
public void stopManagement()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
@Override @Override
public ScalingStats getStats() public ScalingStats getStats()
{ {
@ -281,7 +355,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private void updateTargetWorkerCount( private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig, final WorkerBehaviorConfig workerConfig,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks, final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers final Collection<ZkWorker> zkWorkers
) )
{ {
@ -358,7 +432,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks) private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> pendingTasks)
{ {
synchronized (lock) { synchronized (lock) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -372,4 +446,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return false; return false;
} }
} }
public Collection<ZkWorker> getWorkers(RemoteTaskRunner runner)
{
return runner.getWorkers();
}
} }

View File

@ -39,12 +39,13 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.metadata.EntryExistsException; import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
@ -409,7 +410,18 @@ public class OverlordResource
@Override @Override
public Response apply(TaskRunner taskRunner) public Response apply(TaskRunner taskRunner)
{ {
return Response.ok(taskRunner.getWorkers()).build(); if (taskRunner instanceof RemoteTaskRunner) {
return Response.ok(((RemoteTaskRunner) taskRunner).getWorkers()).build();
} else {
log.debug(
"Task runner [%s] of type [%s] does not support listing workers",
taskRunner,
taskRunner.getClass().getCanonicalName()
);
return Response.serverError()
.entity(ImmutableMap.of("error", "Task Runner does not support worker listing"))
.build();
}
} }
} }
); );
@ -421,9 +433,9 @@ public class OverlordResource
public Response getScalingState() public Response getScalingState()
{ {
// Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler. // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler.
final Optional<ResourceManagementScheduler> rms = taskMaster.getResourceManagementScheduler(); final Optional<ScalingStats> rms = taskMaster.getScalingStats();
if (rms.isPresent()) { if (rms.isPresent()) {
return Response.ok(rms.get().getStats()).build(); return Response.ok(rms.get()).build();
} else { } else {
return Response.ok().build(); return Response.ok().build();
} }
@ -563,7 +575,7 @@ public class OverlordResource
} }
if (status.isPresent()) { if (status.isPresent()) {
data.put("statusCode", status.get().getStatusCode().toString()); data.put("statusCode", status.get().getStatusCode().toString());
if(status.get().isComplete()) { if (status.get().isComplete()) {
data.put("duration", status.get().getDuration()); data.put("duration", status.get().getDuration());
} }
} }

View File

@ -48,7 +48,7 @@ public interface WorkerSelectStrategy
* *
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/ */
public Optional<ImmutableZkWorker> findWorkerForTask( Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config, final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers, final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task final Task task

View File

@ -41,6 +41,9 @@ import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.TaskAnnouncement;
@ -475,7 +478,8 @@ public class RemoteTaskRunnerTest
new SimplePathChildrenCacheFactory.Builder().build(), new SimplePathChildrenCacheFactory.Builder().build(),
null, null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d") ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<RemoteTaskRunner>()
); );
remoteTaskRunner.start(); remoteTaskRunner.start();

View File

@ -27,6 +27,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
@ -45,6 +46,7 @@ import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -53,6 +55,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -61,9 +64,9 @@ public class SimpleResourceManagementStrategyTest
{ {
private AutoScaler autoScaler; private AutoScaler autoScaler;
private Task testTask; private Task testTask;
private SimpleResourceManagementConfig simpleResourceManagementConfig;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy; private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig; private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -75,7 +78,7 @@ public class SimpleResourceManagementStrategyTest
testTask = new TestMergeTask( testTask = new TestMergeTask(
"task1", "task1",
"dummyDs", "dummyDs",
Lists.<DataSegment>newArrayList( Lists.newArrayList(
new DataSegment( new DataSegment(
"dummyDs", "dummyDs",
new Interval("2012-01-01/2012-01-02"), new Interval("2012-01-01/2012-01-02"),
@ -92,13 +95,15 @@ public class SimpleResourceManagementStrategyTest
indexSpec indexSpec
); );
simpleResourceManagementConfig = new SimpleResourceManagementConfig() final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0)) .setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000)) .setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1) .setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0)) .setPendingTaskTimeout(new Period(0))
.setWorkerVersion(""); .setWorkerVersion("");
final ResourceManagementSchedulerConfig schedulerConfig = new ResourceManagementSchedulerConfig();
workerConfig = new AtomicReference<>( workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig( new WorkerBehaviorConfig(
null, null,
@ -108,10 +113,18 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
simpleResourceManagementConfig, simpleResourceManagementConfig,
DSuppliers.of(workerConfig) DSuppliers.of(workerConfig),
schedulerConfig,
executorService
); );
} }
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test @Test
public void testSuccessfulProvision() throws Exception public void testSuccessfulProvision() throws Exception
{ {
@ -120,16 +133,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScaler.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode")) new AutoScalingData(Lists.newArrayList("aNode"))
); );
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
); );
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
); );
@ -156,16 +169,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScaler.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake")) new AutoScalingData(Lists.newArrayList("fake"))
); );
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
).times(2); ).times(2);
@ -212,17 +225,17 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null); .andReturn(null);
EasyMock.expect(autoScaler.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake")) new AutoScalingData(Lists.newArrayList("fake"))
); );
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
).times(2); ).times(2);
@ -268,17 +281,17 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
).times(2); ).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
); );
@ -302,26 +315,26 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.newArrayList("ip")).times(2);
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip")) new AutoScalingData(Lists.newArrayList("ip"))
); );
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
); );
@ -354,12 +367,12 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
@ -370,7 +383,7 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<ZkWorker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -384,7 +397,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
@ -402,25 +415,25 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.<RemoteTaskRunnerWorkItem>asList() Collections.<RemoteTaskRunnerWorkItem>emptyList()
).times(3); ).times(3);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0") new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
) )
).times(3); ).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<ZkWorker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
runner runner
); );
Assert.assertFalse(terminatedSomething); Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScaler); EasyMock.verify(autoScaler);
@ -430,7 +443,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner runner
@ -443,17 +456,17 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")); .andReturn(Lists.newArrayList("ip"));
EasyMock.expect(autoScaler.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3")) new AutoScalingData(Lists.newArrayList("h3"))
); );
// Should provision two new workers // Should provision two new workers
EasyMock.expect(autoScaler.provision()).andReturn( EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4")) new AutoScalingData(Lists.newArrayList("h4"))
); );
EasyMock.replay(autoScaler); EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner runner
); );
Assert.assertTrue(provisionedSomething); Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScaler); EasyMock.verify(autoScaler);
@ -468,12 +481,12 @@ public class SimpleResourceManagementStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn( EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList( Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Collections.<ZkWorker>singletonList(
new TestZkWorker(null) new TestZkWorker(null)
) )
).times(1); ).times(1);
@ -484,7 +497,7 @@ public class SimpleResourceManagementStrategyTest
); );
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner runner
); );
Assert.assertFalse(terminatedSomething); Assert.assertFalse(terminatedSomething);

View File

@ -20,13 +20,13 @@
package io.druid.indexing.overlord.http; package io.druid.indexing.overlord.http;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
@ -39,7 +39,6 @@ import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunner;
@ -47,10 +46,7 @@ import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
@ -157,24 +153,14 @@ public class OverlordResourceTest
taskActionClientFactory, taskActionClientFactory,
druidNode, druidNode,
indexerZkConfig, indexerZkConfig,
new TaskRunnerFactory() new TaskRunnerFactory<MockTaskRunner>()
{ {
@Override @Override
public TaskRunner build() public MockTaskRunner build()
{ {
return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches);
} }
}, },
new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(
RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory
)
{
return new NoopResourceManagementScheduler();
}
},
curator, curator,
new NoopServiceAnnouncer() new NoopServiceAnnouncer()
{ {
@ -195,7 +181,7 @@ public class OverlordResourceTest
// basic task master lifecycle test // basic task master lifecycle test
taskMaster.start(); taskMaster.start();
announcementLatch.await(); announcementLatch.await();
while(!taskMaster.isLeading()){ while (!taskMaster.isLeading()) {
// I believe the control will never reach here and thread will never sleep but just to be on safe side // I believe the control will never reach here and thread will never sleep but just to be on safe side
Thread.sleep(10); Thread.sleep(10);
} }
@ -265,7 +251,8 @@ public class OverlordResourceTest
* These method will not timeout until the condition is met so calling method should ensure timeout * These method will not timeout until the condition is met so calling method should ensure timeout
* This method also assumes that the task with given taskId is present * This method also assumes that the task with given taskId is present
* */ * */
private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException { private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException
{
while (true) { while (true) {
Response response = overlordResource.getTaskStatus(taskId); Response response = overlordResource.getTaskStatus(taskId);
if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) { if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) {
@ -313,28 +300,28 @@ public class OverlordResourceTest
{ {
final String taskId = task.getId(); final String taskId = task.getId();
ListenableFuture<TaskStatus> future = MoreExecutors.listeningDecorator( ListenableFuture<TaskStatus> future = MoreExecutors.listeningDecorator(
Execs.singleThreaded( Execs.singleThreaded(
"noop_test_task_exec_%s" "noop_test_task_exec_%s"
) )
).submit( ).submit(
new Callable<TaskStatus>() new Callable<TaskStatus>()
{
@Override
public TaskStatus call() throws Exception
{ {
// adding of task to list of runningTasks should be done before count down as @Override
// getRunningTasks may not include the task for which latch has been counted down public TaskStatus call() throws Exception
// Count down to let know that task is actually running {
// this is equivalent of getting process holder to run task in ForkingTaskRunner // adding of task to list of runningTasks should be done before count down as
runningTasks.add(taskId); // getRunningTasks may not include the task for which latch has been counted down
runLatches[Integer.parseInt(taskId)].countDown(); // Count down to let know that task is actually running
// Wait for completion count down // this is equivalent of getting process holder to run task in ForkingTaskRunner
completionLatches[Integer.parseInt(taskId)].await(); runningTasks.add(taskId);
taskRunnerWorkItems.remove(taskId); runLatches[Integer.parseInt(taskId)].countDown();
runningTasks.remove(taskId); // Wait for completion count down
return TaskStatus.success(taskId); completionLatches[Integer.parseInt(taskId)].await();
taskRunnerWorkItems.remove(taskId);
runningTasks.remove(taskId);
return TaskStatus.success(taskId);
}
} }
}
); );
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future); TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future);
taskRunnerWorkItems.put(taskId, taskRunnerWorkItem); taskRunnerWorkItems.put(taskId, taskRunnerWorkItem);
@ -348,16 +335,16 @@ public class OverlordResourceTest
public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks() public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{ {
List runningTaskList = Lists.transform( List runningTaskList = Lists.transform(
runningTasks, runningTasks,
new Function<String, TaskRunnerWorkItem>() new Function<String, TaskRunnerWorkItem>()
{
@Nullable
@Override
public TaskRunnerWorkItem apply(String input)
{ {
return taskRunnerWorkItems.get(input); @Nullable
@Override
public TaskRunnerWorkItem apply(String input)
{
return taskRunnerWorkItems.get(input);
}
} }
}
); );
return runningTaskList; return runningTaskList;
} }
@ -375,9 +362,9 @@ public class OverlordResourceTest
} }
@Override @Override
public Collection<ZkWorker> getWorkers() public Optional<ScalingStats> getScalingStats()
{ {
return ImmutableList.of(); return Optional.absent();
} }
} }
} }

View File

@ -61,8 +61,6 @@ import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
@ -141,9 +139,6 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null)); binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));