Use Worker instead of ZkWorker whenver possible

* Moves last run task state information to Worker
* Makes WorkerTaskRunner a TaskRunner which has interfaces to help with getting information about a Worker
This commit is contained in:
Charles Allen 2016-01-11 15:48:48 -08:00
parent 499288ff4b
commit ac13a5942a
22 changed files with 715 additions and 141 deletions

View File

@ -578,6 +578,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
return Optional.absent(); return Optional.absent();
} }
@Override
public void start()
{
// No state setup required
}
@Override @Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{ {

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -29,6 +30,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -42,7 +44,6 @@ import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.RE; import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
@ -53,7 +54,6 @@ import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils; import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
@ -113,7 +113,7 @@ import java.util.concurrent.TimeUnit;
* <p/> * <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. * The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/ */
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{ {
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
@ -153,7 +153,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec; private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement; private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;
public RemoteTaskRunner( public RemoteTaskRunner(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
HttpClient httpClient, HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef, Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec, ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -180,6 +180,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.resourceManagement = resourceManagement; this.resourceManagement = resourceManagement;
} }
@Override
@LifecycleStart @LifecycleStart
public void start() public void start()
{ {
@ -298,6 +299,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
} }
@Override
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
@ -325,7 +327,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return ImmutableList.of(); return ImmutableList.of();
} }
public Collection<ZkWorker> getWorkers() @Override
public Collection<Worker> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}
public Collection<ZkWorker> getZkWorkers()
{ {
return ImmutableList.copyOf(zkWorkers.values()); return ImmutableList.copyOf(zkWorkers.values());
} }
@ -1018,7 +1026,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskRunnerWorkItem.setResult(taskStatus); taskRunnerWorkItem.setResult(taskStatus);
} }
public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers) @Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
{ {
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) { synchronized (statusLock) {
@ -1027,7 +1036,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
String worker = iterator.next(); String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker); ZkWorker zkWorker = zkWorkers.get(worker);
try { try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) { if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker); lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) { if (lazyWorkers.size() == maxWorkers) {
@ -1040,13 +1049,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
return ImmutableList.copyOf(lazyWorkers.values()); return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
} }
} }
private List<String> getAssignedTasks(Worker worker) throws Exception protected List<String> getAssignedTasks(Worker worker) throws Exception
{ {
List<String> assignedTasks = Lists.newArrayList( final List<String> assignedTasks = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost())) cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
); );
@ -1066,9 +1075,25 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return assignedTasks; return assignedTasks;
} }
public List<ZkWorker> getLazyWorkers() @Override
public Collection<Worker> getLazyWorkers()
{ {
return ImmutableList.copyOf(lazyWorkers.values()); return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledExecutorService;
*/ */
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner> public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{ {
public static final String TYPE_NAME = "remote";
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class); private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator; private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
@ -83,7 +84,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
@Override @Override
public RemoteTaskRunner build() public RemoteTaskRunner build()
{ {
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy; final ResourceManagementStrategy<WorkerTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) { if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy( resourceManagementStrategy = new SimpleResourceManagementStrategy(
config, config,

View File

@ -31,6 +31,7 @@ import java.util.List;
/** /**
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}. * Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
* Holds state
*/ */
public interface TaskRunner public interface TaskRunner
{ {
@ -75,4 +76,9 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise * @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/ */
Optional<ScalingStats> getScalingStats(); Optional<ScalingStats> getScalingStats();
/**
* Start the state of the runner
*/
void start();
} }

View File

@ -102,6 +102,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
); );
} }
@Override
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
@ -259,6 +260,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return Optional.absent(); return Optional.absent();
} }
@Override
public void start()
{
// No state startup required
}
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{ {

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Predicate;
import io.druid.indexing.worker.Worker;
import java.util.Collection;
public interface WorkerTaskRunner extends TaskRunner
{
/**
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();
/**
* Return a list of workers who can be reaped by autoscaling
* @return Workers which can be reaped by autoscaling
*/
Collection<Worker> getLazyWorkers();
/**
* Check which workers can be marked as lazy
* @param isLazyWorker
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
}

View File

@ -50,7 +50,6 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter; private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker; private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{ {
@ -129,7 +128,7 @@ public class ZkWorker implements Closeable
@JsonProperty @JsonProperty
public DateTime getLastCompletedTaskTime() public DateTime getLastCompletedTaskTime()
{ {
return lastCompletedTaskTime.get(); return worker.get().getLastCompletedTaskTime();
} }
public boolean isRunningTask(String taskId) public boolean isRunningTask(String taskId)
@ -139,7 +138,7 @@ public class ZkWorker implements Closeable
public boolean isValidVersion(String minVersion) public boolean isValidVersion(String minVersion)
{ {
return worker.get().getVersion().compareTo(minVersion) >= 0; return worker.get().isValidVersion(minVersion);
} }
public void setWorker(Worker newWorker) public void setWorker(Worker newWorker)
@ -153,7 +152,7 @@ public class ZkWorker implements Closeable
public void setLastCompletedTaskTime(DateTime completedTaskTime) public void setLastCompletedTaskTime(DateTime completedTaskTime)
{ {
lastCompletedTaskTime.set(completedTaskTime); worker.get().setLastCompletedTaskTime(completedTaskTime);
} }
public ImmutableZkWorker toImmutable() public ImmutableZkWorker toImmutable()
@ -172,7 +171,6 @@ public class ZkWorker implements Closeable
{ {
return "ZkWorker{" + return "ZkWorker{" +
"worker=" + worker + "worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}'; '}';
} }
} }

View File

@ -24,6 +24,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -33,10 +34,10 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
@ -48,7 +49,7 @@ import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<RemoteTaskRunner> public class SimpleResourceManagementStrategy implements ResourceManagementStrategy<WorkerTaskRunner>
{ {
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
@ -99,10 +100,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
this.exec = exec; this.exec = exec;
} }
boolean doProvision(RemoteTaskRunner runner) boolean doProvision(WorkerTaskRunner runner)
{ {
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks(); Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<ZkWorker> zkWorkers = getWorkers(runner); Collection<Worker> workers = getWorkers(runner);
synchronized (lock) { synchronized (lock) {
boolean didProvision = false; boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -110,19 +111,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerConfig available, cannot provision new workers."); log.warn("No workerConfig available, cannot provision new workers.");
return false; return false;
} }
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config); final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup( final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList( Lists.newArrayList(
Iterables.<ZkWorker, String>transform( Iterables.transform(
zkWorkers, workers,
new Function<ZkWorker, String>() new Function<Worker, String>()
{ {
@Override @Override
public String apply(ZkWorker input) public String apply(Worker input)
{ {
return input.getWorker().getIp(); return input.getIp();
} }
} }
) )
@ -130,7 +131,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
); );
currentlyProvisioning.removeAll(workerNodeIds); currentlyProvisioning.removeAll(workerNodeIds);
updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers); updateTargetWorkerCount(workerConfig, pendingTasks, workers);
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) { while (want > 0) {
@ -167,7 +168,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
boolean doTerminate(RemoteTaskRunner runner) boolean doTerminate(WorkerTaskRunner runner)
{ {
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks(); Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
synchronized (lock) { synchronized (lock) {
@ -183,12 +184,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Lists.newArrayList( Lists.newArrayList(
Iterables.transform( Iterables.transform(
runner.getLazyWorkers(), runner.getLazyWorkers(),
new Function<ZkWorker, String>() new Function<Worker, String>()
{ {
@Override @Override
public String apply(ZkWorker input) public String apply(Worker input)
{ {
return input.getWorker().getIp(); return input.getIp();
} }
} }
) )
@ -205,23 +206,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear(); currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting); currentlyTerminating.addAll(stillExisting);
Collection<ZkWorker> workers = getWorkers(runner); Collection<Worker> workers = getWorkers(runner);
updateTargetWorkerCount(workerConfig, pendingTasks, workers); updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) { if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) { if (excessWorkers > 0) {
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config); final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps = final Collection<String> laziestWorkerIps =
Lists.transform( Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers), runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>() new Function<Worker, String>()
{ {
@Override @Override
public String apply(ZkWorker zkWorker) public String apply(Worker zkWorker)
{ {
return zkWorker.getWorker().getIp(); return zkWorker.getIp();
} }
} }
); );
@ -235,7 +236,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
Joiner.on(", ").join(laziestWorkerIps) Joiner.on(", ").join(laziestWorkerIps)
); );
final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps); final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(ImmutableList.copyOf(laziestWorkerIps));
if (terminated != null) { if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds()); currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime(); lastTerminateTime = new DateTime();
@ -264,7 +265,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
@Override @Override
public void startManagement(final RemoteTaskRunner runner) public void startManagement(final WorkerTaskRunner runner)
{ {
synchronized (lock) { synchronized (lock) {
if (started) { if (started) {
@ -333,16 +334,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats; return scalingStats;
} }
private static Predicate<ZkWorker> createLazyWorkerPredicate( private static Predicate<Worker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config final SimpleResourceManagementConfig config
) )
{ {
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config); final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ZkWorker>() return new Predicate<Worker>()
{ {
@Override @Override
public boolean apply(ZkWorker worker) public boolean apply(Worker worker)
{ {
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); >= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
@ -351,20 +352,20 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}; };
} }
private static Predicate<ZkWorker> createValidWorkerPredicate( private static Predicate<Worker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config final SimpleResourceManagementConfig config
) )
{ {
return new Predicate<ZkWorker>() return new Predicate<Worker>()
{ {
@Override @Override
public boolean apply(ZkWorker zkWorker) public boolean apply(Worker worker)
{ {
final String minVersion = config.getWorkerVersion(); final String minVersion = config.getWorkerVersion();
if (minVersion == null) { if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
} }
return zkWorker.isValidVersion(minVersion); return worker.isValidVersion(minVersion);
} }
}; };
} }
@ -372,15 +373,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private void updateTargetWorkerCount( private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig, final WorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks, final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers final Collection<Worker> zkWorkers
) )
{ {
synchronized (lock) { synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter( final Collection<Worker> validWorkers = Collections2.filter(
zkWorkers, zkWorkers,
createValidWorkerPredicate(config) createValidWorkerPredicate(config)
); );
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config); final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
@ -463,7 +464,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
public Collection<ZkWorker> getWorkers(RemoteTaskRunner runner) public Collection<Worker> getWorkers(WorkerTaskRunner runner)
{ {
return runner.getWorkers(); return runner.getWorkers();
} }

View File

@ -72,4 +72,43 @@ public class RemoteTaskRunnerConfig
{ {
return taskShutdownLinkTimeout; return taskShutdownLinkTimeout;
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RemoteTaskRunnerConfig that = (RemoteTaskRunnerConfig) o;
if (getMaxZnodeBytes() != that.getMaxZnodeBytes()) {
return false;
}
if (!getTaskAssignmentTimeout().equals(that.getTaskAssignmentTimeout())) {
return false;
}
if (!getTaskCleanupTimeout().equals(that.getTaskCleanupTimeout())) {
return false;
}
if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) {
return false;
}
return getTaskShutdownLinkTimeout().equals(that.getTaskShutdownLinkTimeout());
}
@Override
public int hashCode()
{
int result = getTaskAssignmentTimeout().hashCode();
result = 31 * result + getTaskCleanupTimeout().hashCode();
result = 31 * result + getMinWorkerVersion().hashCode();
result = 31 * result + getMaxZnodeBytes();
result = 31 * result + getTaskShutdownLinkTimeout().hashCode();
return result;
}
} }

View File

@ -21,6 +21,9 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* A container for worker metadata. * A container for worker metadata.
@ -31,19 +34,24 @@ public class Worker
private final String ip; private final String ip;
private final int capacity; private final int capacity;
private final String version; private final String version;
private final AtomicReference<DateTime> lastCompletedTaskTime;
@JsonCreator @JsonCreator
public Worker( public Worker(
@JsonProperty("host") String host, @JsonProperty("host") String host,
@JsonProperty("ip") String ip, @JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity, @JsonProperty("capacity") int capacity,
@JsonProperty("version") String version @JsonProperty("version") String version,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
) )
{ {
this.host = host; this.host = host;
this.ip = ip; this.ip = ip;
this.capacity = capacity; this.capacity = capacity;
this.version = version; this.version = version;
this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null
? DateTime.now()
: lastCompletedTaskTime);
} }
@JsonProperty @JsonProperty
@ -70,6 +78,22 @@ public class Worker
return version; return version;
} }
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
}
public boolean isValidVersion(final String minVersion)
{
return getVersion().compareTo(minVersion) >= 0;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -27,10 +27,11 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.ForkingTaskRunner; import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.tasklogs.TaskLogStreamer;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
@ -52,20 +53,18 @@ public class WorkerResource
private static String DISABLED_VERSION = ""; private static String DISABLED_VERSION = "";
private final Worker enabledWorker; private final Worker enabledWorker;
private final Worker disabledWorker;
private final WorkerCuratorCoordinator curatorCoordinator; private final WorkerCuratorCoordinator curatorCoordinator;
private final ForkingTaskRunner taskRunner; private final TaskRunner taskRunner;
@Inject @Inject
public WorkerResource( public WorkerResource(
Worker worker, Worker worker,
WorkerCuratorCoordinator curatorCoordinator, WorkerCuratorCoordinator curatorCoordinator,
ForkingTaskRunner taskRunner TaskRunner taskRunner
) throws Exception ) throws Exception
{ {
this.enabledWorker = worker; this.enabledWorker = worker;
this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION);
this.curatorCoordinator = curatorCoordinator; this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner; this.taskRunner = taskRunner;
} }
@ -77,6 +76,13 @@ public class WorkerResource
public Response doDisable() public Response doDisable()
{ {
try { try {
final Worker disabledWorker = new Worker(
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getLastCompletedTaskTime()
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker); curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
} }
@ -164,18 +170,26 @@ public class WorkerResource
@QueryParam("offset") @DefaultValue("0") long offset @QueryParam("offset") @DefaultValue("0") long offset
) )
{ {
final Optional<ByteSource> stream = taskRunner.streamTaskLog(taskid, offset); if (!(taskRunner instanceof TaskLogStreamer)) {
return Response.status(501)
.entity(String.format(
"Log streaming not supported by [%s]",
taskRunner.getClass().getCanonicalName()
))
.build();
}
try {
final Optional<ByteSource> stream = ((TaskLogStreamer) taskRunner).streamTaskLog(taskid, offset);
if (stream.isPresent()) { if (stream.isPresent()) {
try {
return Response.ok(stream.get().openStream()).build(); return Response.ok(stream.get().openStream()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
} }
catch (IOException e) { catch (IOException e) {
log.warn(e, "Failed to read log for task: %s", taskid); log.warn(e, "Failed to read log for task: %s", taskid);
return Response.serverError().build(); return Response.serverError().build();
} }
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
} }
} }

View File

@ -56,6 +56,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -358,7 +359,7 @@ public class RemoteTaskRunnerTest
doSetup(); doSetup();
final Set<String> existingTasks = Sets.newHashSet(); final Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) { for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet()); existingTasks.addAll(zkWorker.getRunningTasks().keySet());
} }
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
@ -450,7 +451,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled. // Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion()); Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion());
} }
private void doSetup() throws Exception private void doSetup() throws Exception
@ -479,7 +480,7 @@ public class RemoteTaskRunnerTest
null, null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<RemoteTaskRunner>() new NoopResourceManagementStrategy<WorkerTaskRunner>()
); );
remoteTaskRunner.start(); remoteTaskRunner.start();
@ -491,7 +492,8 @@ public class RemoteTaskRunnerTest
"worker", "worker",
"localhost", "localhost",
3, 3,
"0" "0",
DateTime.now()
); );
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
@ -504,7 +506,7 @@ public class RemoteTaskRunnerTest
{ {
cf.setData().forPath( cf.setData().forPath(
announcementsPath, announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "")) jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now()))
); );
} }
@ -575,11 +577,11 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task); remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId())); Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task); mockWorkerRunningTask(task);
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy( Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<Worker>()
{ {
@Override @Override
public boolean apply(ZkWorker input) public boolean apply(Worker input)
{ {
return true; return true;
} }
@ -596,11 +598,11 @@ public class RemoteTaskRunnerTest
doSetup(); doSetup();
remoteTaskRunner.run(task); remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId())); Assert.assertTrue(taskAnnounced(task.getId()));
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy( Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<Worker>()
{ {
@Override @Override
public boolean apply(ZkWorker input) public boolean apply(Worker input)
{ {
return true; return true;
} }
@ -615,11 +617,11 @@ public class RemoteTaskRunnerTest
public void testFindLazyWorkerNotRunningAnyTask() throws Exception public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{ {
doSetup(); doSetup();
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy( Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<Worker>()
{ {
@Override @Override
public boolean apply(ZkWorker input) public boolean apply(Worker input)
{ {
return true; return true;
} }

View File

@ -142,8 +142,8 @@ public class SimpleResourceManagementStrategyTest
) )
); );
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -178,8 +178,8 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
).times(2); ).times(2);
EasyMock.replay(runner); EasyMock.replay(runner);
@ -235,8 +235,8 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
).times(2); ).times(2);
EasyMock.replay(runner); EasyMock.replay(runner);
@ -286,16 +286,16 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
).times(2); ).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList( Collections.<Worker>singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
); );
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner); EasyMock.replay(runner);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner);
@ -328,14 +328,14 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>singletonList( Collections.singletonList(
new TestZkWorker(testTask) new TestZkWorker(testTask).getWorker()
) )
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -377,14 +377,14 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.<ZkWorker>asList( Arrays.asList(
new TestZkWorker(NoopTask.create()), new TestZkWorker(NoopTask.create()).getWorker(),
new TestZkWorker(NoopTask.create()) new TestZkWorker(NoopTask.create()).getWorker()
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<Worker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -422,13 +422,13 @@ public class SimpleResourceManagementStrategyTest
Collections.<RemoteTaskRunnerWorkItem>emptyList() Collections.<RemoteTaskRunnerWorkItem>emptyList()
).times(3); ).times(3);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.<Worker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0") new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker()
) )
).times(3); ).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ZkWorker>>anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<Worker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -486,8 +486,8 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getWorkers()).andReturn( EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<ZkWorker>singletonList( Collections.<Worker>singletonList(
new TestZkWorker(null) new TestZkWorker(null).getWorker()
) )
).times(1); ).times(1);
EasyMock.replay(runner); EasyMock.replay(runner);
@ -525,7 +525,7 @@ public class SimpleResourceManagementStrategyTest
String version String version
) )
{ {
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper()); super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper());
this.testTask = testTask; this.testTask = testTask;
} }

View File

@ -0,0 +1,386 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class RemoteTaskRunnerConfigTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final Period DEFAULT_TIMEOUT = Period.ZERO;
private static final String DEFAULT_VERSION = "";
private static final long DEFAULT_MAX_ZNODE = 10 * 1024;
@Test
public void testGetTaskAssignmentTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getTaskAssignmentTimeout()
);
}
@Test
public void testGetTaskCleanupTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getTaskCleanupTimeout()
);
}
@Test
public void testGetMinWorkerVersion() throws Exception
{
final String version = "some version";
Assert.assertEquals(
version,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
version,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getMinWorkerVersion()
);
}
@Test
public void testGetMaxZnodeBytes() throws Exception
{
final long max = 20 * 1024;
Assert.assertEquals(
max,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
max,
DEFAULT_TIMEOUT
)).getMaxZnodeBytes()
);
}
@Test
public void testGetTaskShutdownLinkTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
timeout
)).getTaskShutdownLinkTimeout()
);
}
@Test
public void testEquals() throws Exception
{
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
))
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
))
);
}
@Test
public void testHashCode() throws Exception
{
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).hashCode()
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
)).hashCode()
);
}
private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException
{
return mapper.readValue(mapper.writeValueAsString(config), RemoteTaskRunnerConfig.class);
}
private RemoteTaskRunnerConfig generateRemoteTaskRunnerConfig(
Period taskAssignmentTimeout,
Period taskCleanupTimeout,
String minWorkerVersion,
long maxZnodeBytes,
Period taskShutdownLinkTimeout
)
{
final Map<String, Object> objectMap = new HashMap<>();
objectMap.put("taskAssignmentTimeout", taskAssignmentTimeout);
objectMap.put("taskCleanupTimeout", taskCleanupTimeout);
objectMap.put("minWorkerVersion", minWorkerVersion);
objectMap.put("maxZnodeBytes", maxZnodeBytes);
objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout);
return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class);
}
}

View File

@ -73,6 +73,7 @@ import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class OverlordResourceTest public class OverlordResourceTest
{ {
@ -274,6 +275,7 @@ public class OverlordResourceTest
private CountDownLatch[] runLatches; private CountDownLatch[] runLatches;
private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems; private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
private List<String> runningTasks; private List<String> runningTasks;
private final AtomicBoolean started = new AtomicBoolean(false);
public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches) public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches)
{ {
@ -289,12 +291,6 @@ public class OverlordResourceTest
return ImmutableList.of(); return ImmutableList.of();
} }
@Override
public void stop()
{
// Do nothing
}
@Override @Override
public synchronized ListenableFuture<TaskStatus> run(final Task task) public synchronized ListenableFuture<TaskStatus> run(final Task task)
{ {
@ -366,5 +362,17 @@ public class OverlordResourceTest
{ {
return Optional.absent(); return Optional.absent();
} }
@Override
public void start()
{
started.set(true);
}
@Override
public void stop()
{
started.set(false);
}
} }
} }

View File

@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -42,12 +43,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0, new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
), ),
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 1, new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),
@ -75,12 +76,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2,
Sets.<String>newHashSet() Sets.<String>newHashSet()
), ),
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5, new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),
@ -108,12 +109,12 @@ public class EqualDistributionWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5,
Sets.<String>newHashSet() Sets.<String>newHashSet()
), ),
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5, new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),

View File

@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -45,12 +46,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0, new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
), ),
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0, new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),
@ -79,12 +80,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"lhost", "lhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0, new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
), ),
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0, new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),
@ -106,7 +107,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
ImmutableMap.of( ImmutableMap.of(
"localhost", "localhost",
new ImmutableZkWorker( new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0, new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet() Sets.<String>newHashSet()
) )
), ),

View File

@ -54,6 +54,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -112,7 +113,8 @@ public class WorkerTaskMonitorTest
"worker", "worker",
"localhost", "localhost",
3, 3,
"0" "0",
DateTime.now()
); );
workerCuratorCoordinator = new WorkerCuratorCoordinator( workerCuratorCoordinator = new WorkerCuratorCoordinator(

View File

@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -73,7 +74,8 @@ public class WorkerResourceTest
"host", "host",
"ip", "ip",
3, 3,
"v1" "v1",
DateTime.now()
); );
curatorCoordinator = new WorkerCuratorCoordinator( curatorCoordinator = new WorkerCuratorCoordinator(

View File

@ -19,6 +19,7 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.junit.runner.Description; import org.junit.runner.Description;
@ -59,7 +60,7 @@ public class CloserRule implements TestRule
} }
finally { finally {
Throwable exception = null; Throwable exception = null;
for (AutoCloseable autoCloseable : autoCloseables) { for (AutoCloseable autoCloseable : Lists.reverse(autoCloseables)) {
try { try {
autoCloseable.close(); autoCloseable.close();
} }

View File

@ -48,6 +48,7 @@ import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
@ -106,7 +107,8 @@ public class CliMiddleManager extends ServerRunnable
node.getHostAndPort(), node.getHostAndPort(),
config.getIp(), config.getIp(),
config.getCapacity(), config.getCapacity(),
config.getVersion() config.getVersion(),
DateTime.now()
); );
} }
}, },

View File

@ -60,6 +60,7 @@ import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskRunnerFactory;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig;
@ -197,7 +198,7 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null); JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
@ -206,7 +207,7 @@ public class CliOverlord extends ServerRunnable
private void configureAutoscale(Binder binder) private void configureAutoscale(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class) binder.bind(new TypeLiteral<ResourceManagementStrategy<WorkerTaskRunner>>(){})
.to(SimpleResourceManagementStrategy.class) .to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class); .in(LazySingleton.class);