Move resource managemnt to be the responsibility of the TaskRunner

This commit is contained in:
Charles Allen 2015-11-11 12:16:03 -08:00
parent 71f554bf80
commit 976d4c965b
24 changed files with 367 additions and 440 deletions

View File

@ -52,6 +52,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.query.DruidMetrics;
@ -542,9 +543,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
@Override
public Collection<ZkWorker> getWorkers()
public Optional<ScalingStats> getScalingStats()
{
return ImmutableList.of();
return Optional.absent();
}
@Override

View File

@ -31,8 +31,8 @@ import io.druid.tasklogs.TaskLogPusher;
import java.util.Properties;
/**
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory<ForkingTaskRunner>
{
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
@ -51,7 +51,8 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
final ObjectMapper jsonMapper,
final TaskLogPusher persistentTaskLogs,
@Self DruidNode node
) {
)
{
this.config = config;
this.taskConfig = taskConfig;
this.workerConfig = workerConfig;
@ -62,7 +63,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
}
@Override
public TaskRunner build()
public ForkingTaskRunner build()
{
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
}

View File

@ -52,9 +52,12 @@ import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
@ -102,7 +105,6 @@ import java.util.concurrent.TimeUnit;
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
@ -149,6 +151,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
@ -158,7 +161,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
)
{
this.jsonMapper = jsonMapper;
@ -171,6 +175,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
}
@LifecycleStart
@ -283,6 +288,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
waitingForMonitor.wait();
}
}
resourceManagement.startManagement(this);
started = true;
}
catch (Exception e) {
@ -298,6 +304,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return;
}
started = false;
resourceManagement.stopManagement();
for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
}
@ -314,7 +323,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.of();
}
@Override
public Collection<ZkWorker> getWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
@ -339,6 +347,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.of(resourceManagement.getStats());
}
public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {

View File

@ -23,9 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig;
@ -35,8 +42,9 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final IndexerZkConfig zkPaths;
@ -44,6 +52,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec;
private final SimpleResourceManagementConfig config;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
@Inject
public RemoteTaskRunnerFactory(
@ -53,7 +64,10 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory
final ScheduledExecutorFactory factory,
final SimpleResourceManagementConfig config,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ScheduledExecutorService exec
)
{
this.curator = curator;
@ -62,12 +76,26 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d");
this.cleanupExec = factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d");
this.config = config;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}
@Override
public TaskRunner build()
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
workerConfigRef,
resourceManagementSchedulerConfig,
exec
);
} else {
resourceManagementStrategy = new NoopResourceManagementStrategy<>();
}
return new RemoteTaskRunner(
jsonMapper,
remoteTaskRunnerConfig,
@ -79,7 +107,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.build(),
httpClient,
workerConfigRef,
cleanupExec
cleanupExec,
resourceManagementStrategy
);
}
}

View File

@ -22,8 +22,6 @@ package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -34,8 +32,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig;
@ -64,7 +61,6 @@ public class TaskMaster
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue;
private volatile ResourceManagementScheduler resourceManagementScheduler;
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
@ -77,7 +73,6 @@ public class TaskMaster
@Self final DruidNode node,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter
@ -118,14 +113,6 @@ public class TaskMaster
.emit();
}
leaderLifecycle.addManagedInstance(taskRunner);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
@ -285,10 +272,10 @@ public class TaskMaster
}
}
public Optional<ResourceManagementScheduler> getResourceManagementScheduler()
public Optional<ScalingStats> getScalingStats()
{
if (leading) {
return Optional.fromNullable(resourceManagementScheduler);
return taskRunner.getScalingStats();
} else {
return Optional.absent();
}

View File

@ -19,10 +19,12 @@
package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Pair;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import java.util.Collection;
import java.util.List;
@ -45,7 +47,7 @@ public interface TaskRunner
*
* @return task status, eventually
*/
public ListenableFuture<TaskStatus> run(Task task);
ListenableFuture<TaskStatus> run(Task task);
/**
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
@ -53,19 +55,24 @@ public interface TaskRunner
*
* @param taskid task ID to clean up resources for
*/
public void shutdown(String taskid);
void shutdown(String taskid);
/**
* Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
* stopping, "run" will not accept further tasks.
*/
public void stop();
void stop();
public Collection<? extends TaskRunnerWorkItem> getRunningTasks();
Collection<? extends TaskRunnerWorkItem> getRunningTasks();
public Collection<? extends TaskRunnerWorkItem> getPendingTasks();
Collection<? extends TaskRunnerWorkItem> getPendingTasks();
public Collection<? extends TaskRunnerWorkItem> getKnownTasks();
Collection<? extends TaskRunnerWorkItem> getKnownTasks();
public Collection<ZkWorker> getWorkers();
/**
* Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
*
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();
}

View File

@ -19,7 +19,7 @@
package io.druid.indexing.overlord;
public interface TaskRunnerFactory
public interface TaskRunnerFactory<T extends TaskRunner>
{
public TaskRunner build();
T build();
}

View File

@ -19,11 +19,11 @@
package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -33,7 +33,6 @@ import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
@ -42,6 +41,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -204,9 +204,9 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
@Override
public Collection<ZkWorker> getWorkers()
public Optional<ScalingStats> getScalingStats()
{
return Lists.newArrayList();
return Optional.absent();
}
@Override

View File

@ -34,17 +34,17 @@ import java.util.List;
})
public interface AutoScaler<T>
{
public int getMinNumWorkers();
int getMinNumWorkers();
public int getMaxNumWorkers();
int getMaxNumWorkers();
public T getEnvConfig();
T getEnvConfig();
public AutoScalingData provision();
AutoScalingData provision();
public AutoScalingData terminate(List<String> ips);
AutoScalingData terminate(List<String> ips);
public AutoScalingData terminateWithIds(List<String> ids);
AutoScalingData terminateWithIds(List<String> ids);
/**
* Provides a lookup of ip addresses to node ids
@ -53,7 +53,7 @@ public interface AutoScaler<T>
*
* @return node ids
*/
public List<String> ipToIdLookup(List<String> ips);
List<String> ipToIdLookup(List<String> ips);
/**
* Provides a lookup of node ids to ip addresses
@ -62,5 +62,5 @@ public interface AutoScaler<T>
*
* @return IPs associated with the node
*/
public List<String> idToIpLookup(List<String> nodeIds);
List<String> idToIpLookup(List<String> nodeIds);
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.logger.Logger;
/**
*/
public class NoopResourceManagementScheduler extends ResourceManagementScheduler
{
private static final Logger log = new Logger(NoopResourceManagementScheduler.class);
public NoopResourceManagementScheduler()
{
super(null, null, null, null);
}
@Override
public void start()
{
log.info("Autoscaling is disabled.");
}
@Override
public void stop()
{
// do nothing
}
@Override
public ScalingStats getStats()
{
return new ScalingStats(0);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.TaskRunner;
public class NoopResourceManagementStrategy<T extends TaskRunner> implements ResourceManagementStrategy<T>
{
@Override
public void startManagement(T runner)
{
}
@Override
public void stopManagement()
{
}
@Override
public ScalingStats getStats()
{
return null;
}
}

View File

@ -1,127 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
* occur. That decision is made in the {@link ResourceManagementStrategy}.
*/
public class ResourceManagementScheduler
{
private static final Logger log = new Logger(ResourceManagementScheduler.class);
private final RemoteTaskRunner taskRunner;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ResourceManagementSchedulerConfig config;
private final ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
public ResourceManagementScheduler(
RemoteTaskRunner taskRunner,
ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorService exec
)
{
this.taskRunner = taskRunner;
this.resourceManagementStrategy = resourceManagementStrategy;
this.config = config;
this.exec = exec;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doProvision(taskRunner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = config.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(period, config.getOriginTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
config.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doTerminate(taskRunner);
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
public ScalingStats getStats()
{
return resourceManagementStrategy.getStats();
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner;
/**
*/
public interface ResourceManagementSchedulerFactory
{
public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory);
}

View File

@ -1,54 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.autoscaling;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.indexing.overlord.RemoteTaskRunner;
/**
*/
public class ResourceManagementSchedulerFactoryImpl implements ResourceManagementSchedulerFactory
{
private final ResourceManagementSchedulerConfig config;
private final ResourceManagementStrategy strategy;
@Inject
public ResourceManagementSchedulerFactoryImpl(
ResourceManagementStrategy strategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorFactory executorFactory
)
{
this.config = config;
this.strategy = strategy;
}
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory)
{
if (config.isDoAutoscale()) {
return new ResourceManagementScheduler(runner, strategy, config, executorFactory.create(1, "ScalingExec--%d"));
}
else {
return new NoopResourceManagementScheduler();
}
}
}

View File

@ -19,17 +19,33 @@
package io.druid.indexing.overlord.autoscaling;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskRunner;
/**
* The ResourceManagementStrategy decides if worker nodes should be provisioned or determined
* based on the available tasks in the system and the state of the workers in the system.
* In general, the resource management is tied to the runner.
*/
public interface ResourceManagementStrategy
public interface ResourceManagementStrategy<T extends TaskRunner>
{
public boolean doProvision(RemoteTaskRunner runner);
/**
* Equivalent to start() but requires a specific runner instance which holds state of interest.
* This method is intended to be called from the TaskRunner's lifecycle
*
* @param runner The TaskRunner state holder this strategy should use during execution
*/
void startManagement(T runner);
public boolean doTerminate(RemoteTaskRunner runner);
/**
* Equivalent to stop()
* Should be called from TaskRunner's lifecycle
*/
void stopManagement();
public ScalingStats getStats();
/**
* Get any interesting stats related to scaling
*
* @return The ScalingStats or `null` if nothing of interest
*/
ScalingStats getStats();
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import org.joda.time.DateTime;
import java.util.Collections;
@ -33,20 +34,20 @@ import java.util.List;
*/
public class ScalingStats
{
public static enum EVENT
public enum EVENT
{
PROVISION,
TERMINATE
}
private static final Comparator<ScalingEvent> comparator = new Comparator<ScalingEvent>()
private static final Comparator<ScalingEvent> COMPARATOR = new Ordering<ScalingEvent>()
{
@Override
public int compare(ScalingEvent s1, ScalingEvent s2)
{
return -s1.getTimestamp().compareTo(s2.getTimestamp());
return s2.getTimestamp().compareTo(s1.getTimestamp());
}
};
}.nullsLast();
private final Object lock = new Object();
@ -55,10 +56,10 @@ public class ScalingStats
public ScalingStats(int capacity)
{
if (capacity == 0) {
this.recentEvents = MinMaxPriorityQueue.orderedBy(comparator).create();
this.recentEvents = MinMaxPriorityQueue.orderedBy(COMPARATOR).create();
} else {
this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator)
.orderedBy(COMPARATOR)
.maximumSize(capacity)
.create();
}
@ -95,7 +96,7 @@ public class ScalingStats
{
synchronized (lock) {
List<ScalingEvent> retVal = Lists.newArrayList(recentEvents);
Collections.sort(retVal, comparator);
Collections.sort(retVal, COMPARATOR);
return retVal;
}
}

View File

@ -99,6 +99,7 @@ public class SimpleResourceManagementConfig
return this;
}
// Do not use this if possible. Assuming all workers will have the same port is bad for containers.
public int getWorkerPort()
{
return workerPort;

View File

@ -29,25 +29,33 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<RemoteTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private volatile boolean started = false;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
@ -63,19 +71,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Inject
public SimpleResourceManagementStrategy(
SimpleResourceManagementConfig config,
Supplier<WorkerBehaviorConfig> workerConfigRef
Supplier<WorkerBehaviorConfig> workerConfigRef,
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
ScheduledExecutorService exec
)
{
this.config = config;
this.workerConfigRef = workerConfigRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.exec = exec;
}
@Override
public boolean doProvision(RemoteTaskRunner runner)
boolean doProvision(RemoteTaskRunner runner)
{
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = runner.getWorkers();
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = getWorkers(runner);
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -140,10 +151,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
@Override
public boolean doTerminate(RemoteTaskRunner runner)
boolean doTerminate(RemoteTaskRunner runner)
{
Collection<RemoteTaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) {
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
@ -179,7 +189,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
Collection<ZkWorker> workers = runner.getWorkers();
Collection<ZkWorker> workers = getWorkers(runner);
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) {
@ -237,6 +247,70 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
@Override
public void startManagement(final RemoteTaskRunner runner)
{
synchronized (lock) {
if (started) {
return;
}
log.info("Started Resource Management Scheduler");
ScheduledExecutors.scheduleAtFixedRate(
exec,
resourceManagementSchedulerConfig.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doProvision(runner);
}
}
);
// Schedule termination of worker nodes periodically
Period period = resourceManagementSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
resourceManagementSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(System.currentTimeMillis(), startTime),
resourceManagementSchedulerConfig.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
doTerminate(runner);
}
}
);
started = true;
}
}
@Override
public void stopManagement()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
}
}
@Override
public ScalingStats getStats()
{
@ -281,7 +355,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)
{
@ -358,7 +432,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
private boolean hasTaskPendingBeyondThreshold(Collection<? extends TaskRunnerWorkItem> pendingTasks)
{
synchronized (lock) {
long now = System.currentTimeMillis();
@ -372,4 +446,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return false;
}
}
public Collection<ZkWorker> getWorkers(RemoteTaskRunner runner)
{
return runner.getWorkers();
}
}

View File

@ -39,12 +39,13 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
@ -409,7 +410,18 @@ public class OverlordResource
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getWorkers()).build();
if (taskRunner instanceof RemoteTaskRunner) {
return Response.ok(((RemoteTaskRunner) taskRunner).getWorkers()).build();
} else {
log.debug(
"Task runner [%s] of type [%s] does not support listing workers",
taskRunner,
taskRunner.getClass().getCanonicalName()
);
return Response.serverError()
.entity(ImmutableMap.of("error", "Task Runner does not support worker listing"))
.build();
}
}
}
);
@ -421,9 +433,9 @@ public class OverlordResource
public Response getScalingState()
{
// Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler.
final Optional<ResourceManagementScheduler> rms = taskMaster.getResourceManagementScheduler();
final Optional<ScalingStats> rms = taskMaster.getScalingStats();
if (rms.isPresent()) {
return Response.ok(rms.get().getStats()).build();
return Response.ok(rms.get()).build();
} else {
return Response.ok().build();
}
@ -563,7 +575,7 @@ public class OverlordResource
}
if (status.isPresent()) {
data.put("statusCode", status.get().getStatusCode().toString());
if(status.get().isComplete()) {
if (status.get().isComplete()) {
data.put("duration", status.get().getDuration());
}
}

View File

@ -48,7 +48,7 @@ public interface WorkerSelectStrategy
*
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/
public Optional<ImmutableZkWorker> findWorkerForTask(
Optional<ImmutableZkWorker> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task

View File

@ -41,6 +41,9 @@ import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
@ -471,7 +474,8 @@ public class RemoteTaskRunnerTest
new SimplePathChildrenCacheFactory.Builder().build(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d")
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<RemoteTaskRunner>()
);
remoteTaskRunner.start();

View File

@ -27,6 +27,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.task.NoopTask;
@ -45,6 +46,7 @@ import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -53,6 +55,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -61,9 +64,9 @@ public class SimpleResourceManagementStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private SimpleResourceManagementConfig simpleResourceManagementConfig;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
@Before
public void setUp() throws Exception
@ -75,7 +78,7 @@ public class SimpleResourceManagementStrategyTest
testTask = new TestMergeTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
Lists.newArrayList(
new DataSegment(
"dummyDs",
new Interval("2012-01-01/2012-01-02"),
@ -92,13 +95,15 @@ public class SimpleResourceManagementStrategyTest
indexSpec
);
simpleResourceManagementConfig = new SimpleResourceManagementConfig()
final SimpleResourceManagementConfig simpleResourceManagementConfig = new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion("");
final ResourceManagementSchedulerConfig schedulerConfig = new ResourceManagementSchedulerConfig();
workerConfig = new AtomicReference<>(
new WorkerBehaviorConfig(
null,
@ -108,10 +113,18 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
simpleResourceManagementConfig,
DSuppliers.of(workerConfig)
DSuppliers.of(workerConfig),
schedulerConfig,
executorService
);
}
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test
public void testSuccessfulProvision() throws Exception
{
@ -120,16 +133,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("aNode"))
new AutoScalingData(Lists.newArrayList("aNode"))
);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
);
@ -156,16 +169,16 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
new AutoScalingData(Lists.newArrayList("fake"))
);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
).times(2);
@ -212,17 +225,17 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null);
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
new AutoScalingData(Lists.newArrayList("fake"))
);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
).times(2);
@ -268,17 +281,17 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
);
@ -302,26 +315,26 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
.andReturn(Lists.newArrayList("ip")).times(2);
EasyMock.expect(autoScaler.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"))
new AutoScalingData(Lists.newArrayList("ip"))
);
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList(
new TestZkWorker(testTask)
)
);
@ -354,12 +367,12 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
@ -370,7 +383,7 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);
@ -384,7 +397,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(runner);
@ -402,19 +415,19 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.<RemoteTaskRunnerWorkItem>asList()
Collections.<RemoteTaskRunnerWorkItem>emptyList()
).times(3);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);
@ -430,7 +443,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
.andReturn(Lists.newArrayList("ip"));
EasyMock.replay(autoScaler);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
runner
@ -443,13 +456,13 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
.andReturn(Lists.newArrayList("ip"));
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
new AutoScalingData(Lists.newArrayList("h3"))
);
// Should provision two new workers
EasyMock.expect(autoScaler.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
new AutoScalingData(Lists.newArrayList("h4"))
);
EasyMock.replay(autoScaler);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
@ -468,12 +481,12 @@ public class SimpleResourceManagementStrategyTest
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getPendingTasks()).andReturn(
Arrays.asList(
Collections.singletonList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList(
Collections.<ZkWorker>singletonList(
new TestZkWorker(null)
)
).times(1);

View File

@ -20,13 +20,13 @@
package io.druid.indexing.overlord.http;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -39,7 +39,6 @@ import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunner;
@ -47,10 +46,7 @@ import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig;
@ -156,24 +152,14 @@ public class OverlordResourceTest
taskActionClientFactory,
druidNode,
indexerZkConfig,
new TaskRunnerFactory()
new TaskRunnerFactory<MockTaskRunner>()
{
@Override
public TaskRunner build()
public MockTaskRunner build()
{
return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches);
}
},
new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(
RemoteTaskRunner runner, ScheduledExecutorFactory executorFactory
)
{
return new NoopResourceManagementScheduler();
}
},
curator,
new NoopServiceAnnouncer()
{
@ -194,7 +180,7 @@ public class OverlordResourceTest
// basic task master lifecycle test
taskMaster.start();
announcementLatch.await();
while(!taskMaster.isLeading()){
while (!taskMaster.isLeading()) {
// I believe the control will never reach here and thread will never sleep but just to be on safe side
Thread.sleep(10);
}
@ -264,7 +250,8 @@ public class OverlordResourceTest
* These method will not timeout until the condition is met so calling method should ensure timeout
* This method also assumes that the task with given taskId is present
* */
private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException {
private void waitForTaskStatus(String taskId, TaskStatus.Status status) throws InterruptedException
{
while (true) {
Response response = overlordResource.getTaskStatus(taskId);
if (status.equals(((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode())) {
@ -374,9 +361,9 @@ public class OverlordResourceTest
}
@Override
public Collection<ZkWorker> getWorkers()
public Optional<ScalingStats> getScalingStats()
{
return ImmutableList.of();
return Optional.absent();
}
}
}

View File

@ -61,8 +61,6 @@ import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactoryImpl;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
@ -141,9 +139,6 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));