From ac13a5942a5936a7935829baa84f5c8df8f84f4d Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 11 Jan 2016 15:48:48 -0800 Subject: [PATCH] Use Worker instead of ZkWorker whenver possible * Moves last run task state information to Worker * Makes WorkerTaskRunner a TaskRunner which has interfaces to help with getting information about a Worker --- .../indexing/overlord/ForkingTaskRunner.java | 6 + .../indexing/overlord/RemoteTaskRunner.java | 51 ++- .../overlord/RemoteTaskRunnerFactory.java | 3 +- .../druid/indexing/overlord/TaskRunner.java | 6 + .../overlord/ThreadPoolTaskRunner.java | 7 + .../indexing/overlord/WorkerTaskRunner.java | 47 +++ .../io/druid/indexing/overlord/ZkWorker.java | 8 +- .../SimpleResourceManagementStrategy.java | 77 ++-- .../config/RemoteTaskRunnerConfig.java | 39 ++ .../java/io/druid/indexing/worker/Worker.java | 26 +- .../indexing/worker/http/WorkerResource.java | 42 +- .../overlord/RemoteTaskRunnerTest.java | 30 +- .../SimpleResourceManagementStrategyTest.java | 64 +-- .../config/RemoteTaskRunnerConfigTest.java | 386 ++++++++++++++++++ .../overlord/http/OverlordResourceTest.java | 20 +- ...lDistributionWorkerSelectStrategyTest.java | 13 +- ...yWithAffinityWorkerSelectStrategyTest.java | 11 +- .../worker/WorkerTaskMonitorTest.java | 4 +- .../worker/http/WorkerResourceTest.java | 4 +- .../java/io/druid/segment/CloserRule.java | 3 +- .../java/io/druid/cli/CliMiddleManager.java | 4 +- .../main/java/io/druid/cli/CliOverlord.java | 5 +- 22 files changed, 715 insertions(+), 141 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index dddf21bf324..32d421289e6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -578,6 +578,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer return Optional.absent(); } + @Override + public void start() + { + // No state setup required + } + @Override public Optional streamTaskLog(final String taskid, final long offset) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index a4efbad057f..1a290c8bb2c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -22,6 +22,7 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -29,6 +30,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -42,7 +44,6 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.RE; import com.metamx.common.lifecycle.LifecycleStart; @@ -53,7 +54,6 @@ import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.concurrent.Execs; import io.druid.curator.CuratorUtils; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; @@ -113,7 +113,7 @@ import java.util.concurrent.TimeUnit; *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ -public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer +public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer { private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); @@ -153,7 +153,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); - private final ResourceManagementStrategy resourceManagement; + private final ResourceManagementStrategy resourceManagement; public RemoteTaskRunner( ObjectMapper jsonMapper, @@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer HttpClient httpClient, Supplier workerConfigRef, ScheduledExecutorService cleanupExec, - ResourceManagementStrategy resourceManagement + ResourceManagementStrategy resourceManagement ) { this.jsonMapper = jsonMapper; @@ -180,6 +180,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.resourceManagement = resourceManagement; } + @Override @LifecycleStart public void start() { @@ -298,6 +299,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } } + @Override @LifecycleStop public void stop() { @@ -325,7 +327,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return ImmutableList.of(); } - public Collection getWorkers() + @Override + public Collection getWorkers() + { + return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values())); + } + + public Collection getZkWorkers() { return ImmutableList.copyOf(zkWorkers.values()); } @@ -1018,7 +1026,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer taskRunnerWorkItem.setResult(taskStatus); } - public List markWorkersLazy(Predicate isLazyWorker, int maxWorkers) + @Override + public Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers) { // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy synchronized (statusLock) { @@ -1027,7 +1036,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer String worker = iterator.next(); ZkWorker zkWorker = zkWorkers.get(worker); try { - if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) { + if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) { log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); lazyWorkers.put(worker, zkWorker); if (lazyWorkers.size() == maxWorkers) { @@ -1040,13 +1049,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer throw Throwables.propagate(e); } } - return ImmutableList.copyOf(lazyWorkers.values()); + return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values())); } } - private List getAssignedTasks(Worker worker) throws Exception + protected List getAssignedTasks(Worker worker) throws Exception { - List assignedTasks = Lists.newArrayList( + final List assignedTasks = Lists.newArrayList( cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost())) ); @@ -1066,9 +1075,25 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return assignedTasks; } - public List getLazyWorkers() + @Override + public Collection getLazyWorkers() { - return ImmutableList.copyOf(lazyWorkers.values()); + return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values())); + } + + public static Collection getWorkerFromZK(Collection workers) + { + return Collections2.transform( + workers, + new Function() + { + @Override + public Worker apply(ZkWorker input) + { + return input.getWorker(); + } + } + ); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 465c3b9d7e9..376034ff2cd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledExecutorService; */ public class RemoteTaskRunnerFactory implements TaskRunnerFactory { + public static final String TYPE_NAME = "remote"; private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class); private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; @@ -83,7 +84,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory resourceManagementStrategy; + final ResourceManagementStrategy resourceManagementStrategy; if (resourceManagementSchedulerConfig.isDoAutoscale()) { resourceManagementStrategy = new SimpleResourceManagementStrategy( config, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java index 8802c2cd297..39f3629cddb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java @@ -31,6 +31,7 @@ import java.util.List; /** * Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}. + * Holds state */ public interface TaskRunner { @@ -75,4 +76,9 @@ public interface TaskRunner * @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise */ Optional getScalingStats(); + + /** + * Start the state of the runner + */ + void start(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index f7ae62f4043..b7a78c2cb9d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -102,6 +102,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker ); } + @Override @LifecycleStop public void stop() { @@ -259,6 +260,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker return Optional.absent(); } + @Override + public void start() + { + // No state startup required + } + @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java new file mode 100644 index 00000000000..42dfa8c0d66 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.overlord; + +import com.google.common.base.Predicate; +import io.druid.indexing.worker.Worker; + +import java.util.Collection; + +public interface WorkerTaskRunner extends TaskRunner +{ + /** + * List of known workers who can accept tasks + * @return A list of workers who can accept tasks for running + */ + Collection getWorkers(); + + /** + * Return a list of workers who can be reaped by autoscaling + * @return Workers which can be reaped by autoscaling + */ + Collection getLazyWorkers(); + + /** + * Check which workers can be marked as lazy + * @param isLazyWorker + * @param maxWorkers + * @return + */ + Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index 6dbb1887a99..9a48b98dd58 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -50,7 +50,6 @@ public class ZkWorker implements Closeable private final Function cacheConverter; private AtomicReference worker; - private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime()); public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { @@ -129,7 +128,7 @@ public class ZkWorker implements Closeable @JsonProperty public DateTime getLastCompletedTaskTime() { - return lastCompletedTaskTime.get(); + return worker.get().getLastCompletedTaskTime(); } public boolean isRunningTask(String taskId) @@ -139,7 +138,7 @@ public class ZkWorker implements Closeable public boolean isValidVersion(String minVersion) { - return worker.get().getVersion().compareTo(minVersion) >= 0; + return worker.get().isValidVersion(minVersion); } public void setWorker(Worker newWorker) @@ -153,7 +152,7 @@ public class ZkWorker implements Closeable public void setLastCompletedTaskTime(DateTime completedTaskTime) { - lastCompletedTaskTime.set(completedTaskTime); + worker.get().setLastCompletedTaskTime(completedTaskTime); } public ImmutableZkWorker toImmutable() @@ -172,7 +171,6 @@ public class ZkWorker implements Closeable { return "ZkWorker{" + "worker=" + worker + - ", lastCompletedTaskTime=" + lastCompletedTaskTime + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java index 5b82f0315b0..9de51b14ba6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java @@ -24,6 +24,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -33,10 +34,10 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import io.druid.granularity.PeriodGranularity; -import io.druid.indexing.overlord.RemoteTaskRunner; +import io.druid.indexing.overlord.WorkerTaskRunner; import io.druid.indexing.overlord.TaskRunnerWorkItem; -import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -48,7 +49,7 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class SimpleResourceManagementStrategy implements ResourceManagementStrategy +public class SimpleResourceManagementStrategy implements ResourceManagementStrategy { private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); @@ -99,10 +100,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat this.exec = exec; } - boolean doProvision(RemoteTaskRunner runner) + boolean doProvision(WorkerTaskRunner runner) { Collection pendingTasks = runner.getPendingTasks(); - Collection zkWorkers = getWorkers(runner); + Collection workers = getWorkers(runner); synchronized (lock) { boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); @@ -110,19 +111,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat log.warn("No workerConfig available, cannot provision new workers."); return false; } - final Predicate isValidWorker = createValidWorkerPredicate(config); - final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); + final Predicate isValidWorker = createValidWorkerPredicate(config); + final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); final List workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup( Lists.newArrayList( - Iterables.transform( - zkWorkers, - new Function() + Iterables.transform( + workers, + new Function() { @Override - public String apply(ZkWorker input) + public String apply(Worker input) { - return input.getWorker().getIp(); + return input.getIp(); } } ) @@ -130,7 +131,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat ); currentlyProvisioning.removeAll(workerNodeIds); - updateTargetWorkerCount(workerConfig, pendingTasks, zkWorkers); + updateTargetWorkerCount(workerConfig, pendingTasks, workers); int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); while (want > 0) { @@ -167,7 +168,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - boolean doTerminate(RemoteTaskRunner runner) + boolean doTerminate(WorkerTaskRunner runner) { Collection pendingTasks = runner.getPendingTasks(); synchronized (lock) { @@ -183,12 +184,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat Lists.newArrayList( Iterables.transform( runner.getLazyWorkers(), - new Function() + new Function() { @Override - public String apply(ZkWorker input) + public String apply(Worker input) { - return input.getWorker().getIp(); + return input.getIp(); } } ) @@ -205,23 +206,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat currentlyTerminating.clear(); currentlyTerminating.addAll(stillExisting); - Collection workers = getWorkers(runner); + Collection workers = getWorkers(runner); updateTargetWorkerCount(workerConfig, pendingTasks, workers); if (currentlyTerminating.isEmpty()) { final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; if (excessWorkers > 0) { - final Predicate isLazyWorker = createLazyWorkerPredicate(config); - final List laziestWorkerIps = - Lists.transform( + final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Collection laziestWorkerIps = + Collections2.transform( runner.markWorkersLazy(isLazyWorker, excessWorkers), - new Function() + new Function() { @Override - public String apply(ZkWorker zkWorker) + public String apply(Worker zkWorker) { - return zkWorker.getWorker().getIp(); + return zkWorker.getIp(); } } ); @@ -235,7 +236,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat Joiner.on(", ").join(laziestWorkerIps) ); - final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(laziestWorkerIps); + final AutoScalingData terminated = workerConfig.getAutoScaler().terminate(ImmutableList.copyOf(laziestWorkerIps)); if (terminated != null) { currentlyTerminating.addAll(terminated.getNodeIds()); lastTerminateTime = new DateTime(); @@ -264,7 +265,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } @Override - public void startManagement(final RemoteTaskRunner runner) + public void startManagement(final WorkerTaskRunner runner) { synchronized (lock) { if (started) { @@ -333,16 +334,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return scalingStats; } - private static Predicate createLazyWorkerPredicate( + private static Predicate createLazyWorkerPredicate( final SimpleResourceManagementConfig config ) { - final Predicate isValidWorker = createValidWorkerPredicate(config); + final Predicate isValidWorker = createValidWorkerPredicate(config); - return new Predicate() + return new Predicate() { @Override - public boolean apply(ZkWorker worker) + public boolean apply(Worker worker) { final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); @@ -351,20 +352,20 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat }; } - private static Predicate createValidWorkerPredicate( + private static Predicate createValidWorkerPredicate( final SimpleResourceManagementConfig config ) { - return new Predicate() + return new Predicate() { @Override - public boolean apply(ZkWorker zkWorker) + public boolean apply(Worker worker) { final String minVersion = config.getWorkerVersion(); if (minVersion == null) { throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); } - return zkWorker.isValidVersion(minVersion); + return worker.isValidVersion(minVersion); } }; } @@ -372,15 +373,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private void updateTargetWorkerCount( final WorkerBehaviorConfig workerConfig, final Collection pendingTasks, - final Collection zkWorkers + final Collection zkWorkers ) { synchronized (lock) { - final Collection validWorkers = Collections2.filter( + final Collection validWorkers = Collections2.filter( zkWorkers, createValidWorkerPredicate(config) ); - final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Predicate isLazyWorker = createLazyWorkerPredicate(config); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); @@ -463,7 +464,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - public Collection getWorkers(RemoteTaskRunner runner) + public Collection getWorkers(WorkerTaskRunner runner) { return runner.getWorkers(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index 33182024d33..fce9c8fbf7e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -72,4 +72,43 @@ public class RemoteTaskRunnerConfig { return taskShutdownLinkTimeout; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RemoteTaskRunnerConfig that = (RemoteTaskRunnerConfig) o; + + if (getMaxZnodeBytes() != that.getMaxZnodeBytes()) { + return false; + } + if (!getTaskAssignmentTimeout().equals(that.getTaskAssignmentTimeout())) { + return false; + } + if (!getTaskCleanupTimeout().equals(that.getTaskCleanupTimeout())) { + return false; + } + if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) { + return false; + } + return getTaskShutdownLinkTimeout().equals(that.getTaskShutdownLinkTimeout()); + + } + + @Override + public int hashCode() + { + int result = getTaskAssignmentTimeout().hashCode(); + result = 31 * result + getTaskCleanupTimeout().hashCode(); + result = 31 * result + getMinWorkerVersion().hashCode(); + result = 31 * result + getMaxZnodeBytes(); + result = 31 * result + getTaskShutdownLinkTimeout().hashCode(); + return result; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java index 2bfa4f29c3d..21772f0ce9b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java @@ -21,6 +21,9 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.DateTime; + +import java.util.concurrent.atomic.AtomicReference; /** * A container for worker metadata. @@ -31,19 +34,24 @@ public class Worker private final String ip; private final int capacity; private final String version; + private final AtomicReference lastCompletedTaskTime; @JsonCreator public Worker( @JsonProperty("host") String host, @JsonProperty("ip") String ip, @JsonProperty("capacity") int capacity, - @JsonProperty("version") String version + @JsonProperty("version") String version, + @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime ) { this.host = host; this.ip = ip; this.capacity = capacity; this.version = version; + this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null + ? DateTime.now() + : lastCompletedTaskTime); } @JsonProperty @@ -70,6 +78,22 @@ public class Worker return version; } + @JsonProperty + public DateTime getLastCompletedTaskTime() + { + return lastCompletedTaskTime.get(); + } + + public void setLastCompletedTaskTime(DateTime completedTaskTime) + { + lastCompletedTaskTime.set(completedTaskTime); + } + + public boolean isValidVersion(final String minVersion) + { + return getVersion().compareTo(minVersion) >= 0; + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index ce668de7657..a83ec1ecace 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -27,10 +27,11 @@ import com.google.common.collect.Lists; import com.google.common.io.ByteSource; import com.google.inject.Inject; import com.metamx.common.logger.Logger; -import io.druid.indexing.overlord.ForkingTaskRunner; +import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.WorkerCuratorCoordinator; +import io.druid.tasklogs.TaskLogStreamer; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -52,20 +53,18 @@ public class WorkerResource private static String DISABLED_VERSION = ""; private final Worker enabledWorker; - private final Worker disabledWorker; private final WorkerCuratorCoordinator curatorCoordinator; - private final ForkingTaskRunner taskRunner; + private final TaskRunner taskRunner; @Inject public WorkerResource( Worker worker, WorkerCuratorCoordinator curatorCoordinator, - ForkingTaskRunner taskRunner + TaskRunner taskRunner ) throws Exception { this.enabledWorker = worker; - this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION); this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; } @@ -77,6 +76,13 @@ public class WorkerResource public Response doDisable() { try { + final Worker disabledWorker = new Worker( + enabledWorker.getHost(), + enabledWorker.getIp(), + enabledWorker.getCapacity(), + DISABLED_VERSION, + enabledWorker.getLastCompletedTaskTime() + ); curatorCoordinator.updateWorkerAnnouncement(disabledWorker); return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); } @@ -164,18 +170,26 @@ public class WorkerResource @QueryParam("offset") @DefaultValue("0") long offset ) { - final Optional stream = taskRunner.streamTaskLog(taskid, offset); + if (!(taskRunner instanceof TaskLogStreamer)) { + return Response.status(501) + .entity(String.format( + "Log streaming not supported by [%s]", + taskRunner.getClass().getCanonicalName() + )) + .build(); + } + try { + final Optional stream = ((TaskLogStreamer) taskRunner).streamTaskLog(taskid, offset); - if (stream.isPresent()) { - try { + if (stream.isPresent()) { return Response.ok(stream.get().openStream()).build(); + } else { + return Response.status(Response.Status.NOT_FOUND).build(); } - catch (IOException e) { - log.warn(e, "Failed to read log for task: %s", taskid); - return Response.serverError().build(); - } - } else { - return Response.status(Response.Status.NOT_FOUND).build(); + } + catch (IOException e) { + log.warn(e, "Failed to read log for task: %s", taskid); + return Response.serverError().build(); } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index c3fc05ae9e1..57209c793fa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -56,6 +56,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -358,7 +359,7 @@ public class RemoteTaskRunnerTest doSetup(); final Set existingTasks = Sets.newHashSet(); - for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) { + for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) { existingTasks.addAll(zkWorker.getRunningTasks().keySet()); } Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); @@ -450,7 +451,7 @@ public class RemoteTaskRunnerTest Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); // Confirm RTR thinks the worker is disabled. - Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion()); + Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion()); } private void doSetup() throws Exception @@ -479,7 +480,7 @@ public class RemoteTaskRunnerTest null, DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())), ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), - new NoopResourceManagementStrategy() + new NoopResourceManagementStrategy() ); remoteTaskRunner.start(); @@ -491,7 +492,8 @@ public class RemoteTaskRunnerTest "worker", "localhost", 3, - "0" + "0", + DateTime.now() ); cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( @@ -504,7 +506,7 @@ public class RemoteTaskRunnerTest { cf.setData().forPath( announcementsPath, - jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "")) + jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now())) ); } @@ -575,11 +577,11 @@ public class RemoteTaskRunnerTest remoteTaskRunner.run(task); Assert.assertTrue(taskAnnounced(task.getId())); mockWorkerRunningTask(task); - Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + Collection lazyworkers = remoteTaskRunner.markWorkersLazy( + new Predicate() { @Override - public boolean apply(ZkWorker input) + public boolean apply(Worker input) { return true; } @@ -596,11 +598,11 @@ public class RemoteTaskRunnerTest doSetup(); remoteTaskRunner.run(task); Assert.assertTrue(taskAnnounced(task.getId())); - Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + Collection lazyworkers = remoteTaskRunner.markWorkersLazy( + new Predicate() { @Override - public boolean apply(ZkWorker input) + public boolean apply(Worker input) { return true; } @@ -615,11 +617,11 @@ public class RemoteTaskRunnerTest public void testFindLazyWorkerNotRunningAnyTask() throws Exception { doSetup(); - Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + Collection lazyworkers = remoteTaskRunner.markWorkersLazy( + new Predicate() { @Override - public boolean apply(ZkWorker input) + public boolean apply(Worker input) { return true; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index 1c477b44145..32adbafc5c4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -142,8 +142,8 @@ public class SimpleResourceManagementStrategyTest ) ); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ); EasyMock.replay(runner); @@ -178,8 +178,8 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ).times(2); EasyMock.replay(runner); @@ -235,8 +235,8 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ).times(2); EasyMock.replay(runner); @@ -286,16 +286,16 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ).times(2); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ); - EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); EasyMock.replay(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); @@ -328,14 +328,14 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ).times(2); - EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.singletonList( - new TestZkWorker(testTask) + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.singletonList( + new TestZkWorker(testTask).getWorker() ) ); EasyMock.replay(runner); @@ -377,14 +377,14 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Arrays.asList( - new TestZkWorker(NoopTask.create()), - new TestZkWorker(NoopTask.create()) + Arrays.asList( + new TestZkWorker(NoopTask.create()).getWorker(), + new TestZkWorker(NoopTask.create()).getWorker() ) ).times(2); - EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.emptyList() + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.emptyList() ); EasyMock.replay(runner); @@ -422,13 +422,13 @@ public class SimpleResourceManagementStrategyTest Collections.emptyList() ).times(3); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0") + Collections.singletonList( + new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker() ) ).times(3); - EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.emptyList() + EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( + Collections.emptyList() ); EasyMock.replay(runner); @@ -486,8 +486,8 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(null) + Collections.singletonList( + new TestZkWorker(null).getWorker() ) ).times(1); EasyMock.replay(runner); @@ -525,7 +525,7 @@ public class SimpleResourceManagementStrategyTest String version ) { - super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper()); + super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper()); this.testTask = testTask; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java new file mode 100644 index 00000000000..24927d25e37 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class RemoteTaskRunnerConfigTest +{ + private static final ObjectMapper mapper = new DefaultObjectMapper(); + private static final Period DEFAULT_TIMEOUT = Period.ZERO; + private static final String DEFAULT_VERSION = ""; + private static final long DEFAULT_MAX_ZNODE = 10 * 1024; + + @Test + public void testGetTaskAssignmentTimeout() throws Exception + { + final Period timeout = Period.hours(1); + Assert.assertEquals( + timeout, + reflect(generateRemoteTaskRunnerConfig( + timeout, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )).getTaskAssignmentTimeout() + ); + } + + @Test + public void testGetTaskCleanupTimeout() throws Exception + { + final Period timeout = Period.hours(1); + Assert.assertEquals( + timeout, + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + timeout, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )).getTaskCleanupTimeout() + ); + } + + @Test + public void testGetMinWorkerVersion() throws Exception + { + final String version = "some version"; + Assert.assertEquals( + version, + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + version, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )).getMinWorkerVersion() + ); + } + + @Test + public void testGetMaxZnodeBytes() throws Exception + { + final long max = 20 * 1024; + Assert.assertEquals( + max, + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + max, + DEFAULT_TIMEOUT + )).getMaxZnodeBytes() + ); + } + + @Test + public void testGetTaskShutdownLinkTimeout() throws Exception + { + final Period timeout = Period.hours(1); + Assert.assertEquals( + timeout, + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + timeout + )).getTaskShutdownLinkTimeout() + ); + } + + @Test + public void testEquals() throws Exception + { + Assert.assertEquals( + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )), + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )) + ); + final Period timeout = Period.years(999); + final String version = "someVersion"; + final long max = 20 * 1024; + Assert.assertEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )) + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + timeout, + version, + max, + timeout + )) + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + DEFAULT_TIMEOUT, + version, + max, + timeout + )) + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + DEFAULT_VERSION, + max, + timeout + )) + ); + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + DEFAULT_MAX_ZNODE, + timeout + )) + ); + + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + DEFAULT_TIMEOUT + )) + ); + } + + @Test + public void testHashCode() throws Exception + { + Assert.assertEquals( + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT + )).hashCode() + ); + final Period timeout = Period.years(999); + final String version = "someVersion"; + final long max = 20 * 1024; + Assert.assertEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode() + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + timeout, + version, + max, + timeout + )).hashCode() + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + DEFAULT_TIMEOUT, + version, + max, + timeout + )).hashCode() + ); + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + DEFAULT_VERSION, + max, + timeout + )).hashCode() + ); + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + DEFAULT_MAX_ZNODE, + timeout + )).hashCode() + ); + + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + DEFAULT_TIMEOUT + )).hashCode() + ); + } + + private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException + { + return mapper.readValue(mapper.writeValueAsString(config), RemoteTaskRunnerConfig.class); + } + + private RemoteTaskRunnerConfig generateRemoteTaskRunnerConfig( + Period taskAssignmentTimeout, + Period taskCleanupTimeout, + String minWorkerVersion, + long maxZnodeBytes, + Period taskShutdownLinkTimeout + ) + { + final Map objectMap = new HashMap<>(); + objectMap.put("taskAssignmentTimeout", taskAssignmentTimeout); + objectMap.put("taskCleanupTimeout", taskCleanupTimeout); + objectMap.put("minWorkerVersion", minWorkerVersion); + objectMap.put("maxZnodeBytes", maxZnodeBytes); + objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout); + return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 24a6e2bdc7d..59890f5b48b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -73,6 +73,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class OverlordResourceTest { @@ -274,6 +275,7 @@ public class OverlordResourceTest private CountDownLatch[] runLatches; private ConcurrentHashMap taskRunnerWorkItems; private List runningTasks; + private final AtomicBoolean started = new AtomicBoolean(false); public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches) { @@ -289,12 +291,6 @@ public class OverlordResourceTest return ImmutableList.of(); } - @Override - public void stop() - { - // Do nothing - } - @Override public synchronized ListenableFuture run(final Task task) { @@ -366,5 +362,17 @@ public class OverlordResourceTest { return Optional.absent(); } + + @Override + public void start() + { + started.set(true); + } + + @Override + public void stop() + { + started.set(false); + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index b43b5c6c482..ec64c8b505f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.worker.Worker; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -42,12 +43,12 @@ public class EqualDistributionWorkerSelectStrategyTest ImmutableMap.of( "lhost", new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1"), 0, + new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ), "localhost", new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1"), 1, + new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1, Sets.newHashSet() ) ), @@ -75,12 +76,12 @@ public class EqualDistributionWorkerSelectStrategyTest ImmutableMap.of( "lhost", new ImmutableZkWorker( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2, Sets.newHashSet() ), "localhost", new ImmutableZkWorker( - new Worker("enableHost", "enableHost", 10, "v1"), 5, + new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5, Sets.newHashSet() ) ), @@ -108,12 +109,12 @@ public class EqualDistributionWorkerSelectStrategyTest ImmutableMap.of( "lhost", new ImmutableZkWorker( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5, Sets.newHashSet() ), "localhost", new ImmutableZkWorker( - new Worker("enableHost", "enableHost", 10, "v1"), 5, + new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5, Sets.newHashSet() ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java index d4169e73ec4..99a9571b553 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.worker.Worker; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -45,12 +46,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest ImmutableMap.of( "lhost", new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1"), 0, + new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ), "localhost", new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1"), 0, + new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ) ), @@ -79,12 +80,12 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest ImmutableMap.of( "lhost", new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1"), 0, + new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ), "localhost", new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1"), 0, + new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ) ), @@ -106,7 +107,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest ImmutableMap.of( "localhost", new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1"), 0, + new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, Sets.newHashSet() ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 8996794fd9e..0105cde6172 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -54,6 +54,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -112,7 +113,8 @@ public class WorkerTaskMonitorTest "worker", "localhost", 3, - "0" + "0", + DateTime.now() ); workerCuratorCoordinator = new WorkerCuratorCoordinator( diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index edfce8a296a..acd05a07241 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -73,7 +74,8 @@ public class WorkerResourceTest "host", "ip", 3, - "v1" + "v1", + DateTime.now() ); curatorCoordinator = new WorkerCuratorCoordinator( diff --git a/processing/src/test/java/io/druid/segment/CloserRule.java b/processing/src/test/java/io/druid/segment/CloserRule.java index addc6a5492c..1a64aa2d0ec 100644 --- a/processing/src/test/java/io/druid/segment/CloserRule.java +++ b/processing/src/test/java/io/druid/segment/CloserRule.java @@ -19,6 +19,7 @@ package io.druid.segment; +import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import org.junit.rules.TestRule; import org.junit.runner.Description; @@ -59,7 +60,7 @@ public class CloserRule implements TestRule } finally { Throwable exception = null; - for (AutoCloseable autoCloseable : autoCloseables) { + for (AutoCloseable autoCloseable : Lists.reverse(autoCloseables)) { try { autoCloseable.close(); } diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index e8435659a32..03e7e013a4e 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -48,6 +48,7 @@ import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.server.DruidNode; import io.druid.server.initialization.jetty.JettyServerInitializer; import org.eclipse.jetty.server.Server; +import org.joda.time.DateTime; import java.util.List; @@ -106,7 +107,8 @@ public class CliMiddleManager extends ServerRunnable node.getHostAndPort(), config.getIp(), config.getCapacity(), - config.getVersion() + config.getVersion(), + DateTime.now() ); } }, diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 551064aaafd..1ce74669b56 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -60,6 +60,7 @@ import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskRunnerFactory; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; +import io.druid.indexing.overlord.WorkerTaskRunner; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; @@ -197,7 +198,7 @@ public class CliOverlord extends ServerRunnable biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); - biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null); @@ -206,7 +207,7 @@ public class CliOverlord extends ServerRunnable private void configureAutoscale(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); - binder.bind(ResourceManagementStrategy.class) + binder.bind(new TypeLiteral>(){}) .to(SimpleResourceManagementStrategy.class) .in(LazySingleton.class);