diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java new file mode 100644 index 00000000000..9d4026004f7 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -0,0 +1,150 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.worker.Worker; +import org.joda.time.DateTime; + +import java.util.Collection; +import java.util.Set; + +/** + * A snapshot of a Worker and its current state i.e tasks assigned to that worker. + */ +public class ImmutableWorkerInfo +{ + private final Worker worker; + private final int currCapacityUsed; + private final ImmutableSet availabilityGroups; + private final ImmutableSet runningTasks; + private final DateTime lastCompletedTaskTime; + + @JsonCreator + public ImmutableWorkerInfo( + @JsonProperty("worker") Worker worker, + @JsonProperty("currCapacityUsed") int currCapacityUsed, + @JsonProperty("availabilityGroups") Set availabilityGroups, + @JsonProperty("runningTasks") Collection runningTasks, + @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime + ) + { + this.worker = worker; + this.currCapacityUsed = currCapacityUsed; + this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); + this.runningTasks = ImmutableSet.copyOf(runningTasks); + this.lastCompletedTaskTime = lastCompletedTaskTime; + } + + @JsonProperty("worker") + public Worker getWorker() + { + return worker; + } + + @JsonProperty("currCapacityUsed") + public int getCurrCapacityUsed() + { + return currCapacityUsed; + } + + @JsonProperty("availabilityGroups") + public Set getAvailabilityGroups() + { + return availabilityGroups; + } + + @JsonProperty("runningTasks") + public Set getRunningTasks() + { + return runningTasks; + } + + @JsonProperty("lastCompletedTaskTime") + public DateTime getLastCompletedTaskTime() + { + return lastCompletedTaskTime; + } + + public boolean isValidVersion(String minVersion) + { + return worker.getVersion().compareTo(minVersion) >= 0; + } + + public boolean canRunTask(Task task) + { + return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() + && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ImmutableWorkerInfo that = (ImmutableWorkerInfo) o; + + if (currCapacityUsed != that.currCapacityUsed) { + return false; + } + if (!worker.equals(that.worker)) { + return false; + } + if (!availabilityGroups.equals(that.availabilityGroups)) { + return false; + } + if (!runningTasks.equals(that.runningTasks)) { + return false; + } + return lastCompletedTaskTime.equals(that.lastCompletedTaskTime); + + } + + @Override + public int hashCode() + { + int result = worker.hashCode(); + result = 31 * result + currCapacityUsed; + result = 31 * result + availabilityGroups.hashCode(); + result = 31 * result + runningTasks.hashCode(); + result = 31 * result + lastCompletedTaskTime.hashCode(); + return result; + } + + @Override + public String toString() + { + return "ImmutableWorkerInfo{" + + "worker=" + worker + + ", currCapacityUsed=" + currCapacityUsed + + ", availabilityGroups=" + availabilityGroups + + ", runningTasks=" + runningTasks + + ", lastCompletedTaskTime=" + lastCompletedTaskTime + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java deleted file mode 100644 index 347a768fcb5..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.overlord; - -import com.google.common.collect.ImmutableSet; -import io.druid.indexing.common.task.Task; -import io.druid.indexing.worker.Worker; - -import java.util.Set; - -/** - * A snapshot of a {@link io.druid.indexing.overlord.ZkWorker} - */ -public class ImmutableZkWorker -{ - private final Worker worker; - private final int currCapacityUsed; - private final ImmutableSet availabilityGroups; - - public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set availabilityGroups) - { - this.worker = worker; - this.currCapacityUsed = currCapacityUsed; - this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); - } - - public Worker getWorker() - { - return worker; - } - - public int getCurrCapacityUsed() - { - return currCapacityUsed; - } - - public Set getAvailabilityGroups() - { - return availabilityGroups; - } - - public boolean isValidVersion(String minVersion) - { - return worker.getVersion().compareTo(minVersion) >= 0; - } - - public boolean canRunTask(Task task) - { - return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() - && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); - } -} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 209ffac3c26..cb279ab088c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -107,13 +107,13 @@ import java.util.concurrent.TimeUnit; * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running. * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup. * The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK. - *

+ *

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * fail. The RemoteTaskRunner depends on another component to create additional worker resources. - *

+ *

* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the * worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up. - *

+ *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @@ -365,14 +365,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public Collection getWorkers() + public Collection getWorkers() { - return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values())); - } - - public Collection getZkWorkers() - { - return ImmutableList.copyOf(zkWorkers.values()); + return getImmutableWorkerFromZK(zkWorkers.values()); } @Override @@ -672,7 +667,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ZkWorker assignedWorker = null; try { - final Optional immutableZkWorker = strategy.findWorkerForTask( + final Optional immutableZkWorker = strategy.findWorkerForTask( config, ImmutableMap.copyOf( Maps.transformEntries( @@ -687,10 +682,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } ), - new Maps.EntryTransformer() + new Maps.EntryTransformer() { @Override - public ImmutableZkWorker transformEntry( + public ImmutableWorkerInfo transformEntry( String key, ZkWorker value ) { @@ -712,7 +707,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; - } finally { + } + finally { if (assignedWorker != null) { workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost()); // note that this is essential as a task might not get a worker because a worker was assigned another task. @@ -1092,7 +1088,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers) + public Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers) { // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy synchronized (statusLock) { @@ -1101,7 +1097,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer String worker = iterator.next(); ZkWorker zkWorker = zkWorkers.get(worker); try { - if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) { + if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) { log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost()); lazyWorkers.put(worker, zkWorker); if (lazyWorkers.size() == maxWorkers) { @@ -1146,6 +1142,23 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values())); } + private static ImmutableList getImmutableWorkerFromZK(Collection workers) + { + return ImmutableList.copyOf( + Collections2.transform( + workers, + new Function() + { + @Override + public ImmutableWorkerInfo apply(ZkWorker input) + { + return input.toImmutable(); + } + } + ) + ); + } + public static Collection getWorkerFromZK(Collection workers) { return Collections2.transform( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java index 42dfa8c0d66..01e90a1cdc8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/WorkerTaskRunner.java @@ -29,7 +29,7 @@ public interface WorkerTaskRunner extends TaskRunner * List of known workers who can accept tasks * @return A list of workers who can accept tasks for running */ - Collection getWorkers(); + Collection getWorkers(); /** * Return a list of workers who can be reaped by autoscaling @@ -43,5 +43,5 @@ public interface WorkerTaskRunner extends TaskRunner * @param maxWorkers * @return */ - Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers); + Collection markWorkersLazy(Predicate isLazyWorker, int maxWorkers); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index 9a48b98dd58..0ba6128875e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -50,6 +50,7 @@ public class ZkWorker implements Closeable private final Function cacheConverter; private AtomicReference worker; + private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime()); public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { @@ -128,7 +129,7 @@ public class ZkWorker implements Closeable @JsonProperty public DateTime getLastCompletedTaskTime() { - return worker.get().getLastCompletedTaskTime(); + return lastCompletedTaskTime.get(); } public boolean isRunningTask(String taskId) @@ -138,7 +139,7 @@ public class ZkWorker implements Closeable public boolean isValidVersion(String minVersion) { - return worker.get().isValidVersion(minVersion); + return worker.get().getVersion().compareTo(minVersion) >= 0; } public void setWorker(Worker newWorker) @@ -152,12 +153,13 @@ public class ZkWorker implements Closeable public void setLastCompletedTaskTime(DateTime completedTaskTime) { - worker.get().setLastCompletedTaskTime(completedTaskTime); + lastCompletedTaskTime.set(completedTaskTime); } - public ImmutableZkWorker toImmutable() + public ImmutableWorkerInfo toImmutable() { - return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups()); + + return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get()); } @Override @@ -171,6 +173,7 @@ public class ZkWorker implements Closeable { return "ZkWorker{" + "worker=" + worker + + ", lastCompletedTaskTime=" + lastCompletedTaskTime + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java index 9de51b14ba6..3d2b1cd43ac 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java @@ -34,8 +34,9 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import io.druid.granularity.PeriodGranularity; -import io.druid.indexing.overlord.WorkerTaskRunner; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.TaskRunnerWorkItem; +import io.druid.indexing.overlord.WorkerTaskRunner; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -103,7 +104,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat boolean doProvision(WorkerTaskRunner runner) { Collection pendingTasks = runner.getPendingTasks(); - Collection workers = getWorkers(runner); + Collection workers = getWorkers(runner); synchronized (lock) { boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); @@ -111,19 +112,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat log.warn("No workerConfig available, cannot provision new workers."); return false; } - final Predicate isValidWorker = createValidWorkerPredicate(config); + final Predicate isValidWorker = createValidWorkerPredicate(config); final int currValidWorkers = Collections2.filter(workers, isValidWorker).size(); final List workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup( Lists.newArrayList( Iterables.transform( workers, - new Function() + new Function() { @Override - public String apply(Worker input) + public String apply(ImmutableWorkerInfo input) { - return input.getIp(); + return input.getWorker().getIp(); } } ) @@ -206,14 +207,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat currentlyTerminating.clear(); currentlyTerminating.addAll(stillExisting); - Collection workers = getWorkers(runner); + Collection workers = getWorkers(runner); updateTargetWorkerCount(workerConfig, pendingTasks, workers); if (currentlyTerminating.isEmpty()) { final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount; if (excessWorkers > 0) { - final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Predicate isLazyWorker = createLazyWorkerPredicate(config); final Collection laziestWorkerIps = Collections2.transform( runner.markWorkersLazy(isLazyWorker, excessWorkers), @@ -334,16 +335,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return scalingStats; } - private static Predicate createLazyWorkerPredicate( + private static Predicate createLazyWorkerPredicate( final SimpleResourceManagementConfig config ) { - final Predicate isValidWorker = createValidWorkerPredicate(config); + final Predicate isValidWorker = createValidWorkerPredicate(config); - return new Predicate() + return new Predicate() { @Override - public boolean apply(Worker worker) + public boolean apply(ImmutableWorkerInfo worker) { final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis() >= config.getWorkerIdleTimeout().toStandardDuration().getMillis(); @@ -352,14 +353,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat }; } - private static Predicate createValidWorkerPredicate( + private static Predicate createValidWorkerPredicate( final SimpleResourceManagementConfig config ) { - return new Predicate() + return new Predicate() { @Override - public boolean apply(Worker worker) + public boolean apply(ImmutableWorkerInfo worker) { final String minVersion = config.getWorkerVersion(); if (minVersion == null) { @@ -373,15 +374,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private void updateTargetWorkerCount( final WorkerBehaviorConfig workerConfig, final Collection pendingTasks, - final Collection zkWorkers + final Collection zkWorkers ) { synchronized (lock) { - final Collection validWorkers = Collections2.filter( + final Collection validWorkers = Collections2.filter( zkWorkers, createValidWorkerPredicate(config) ); - final Predicate isLazyWorker = createLazyWorkerPredicate(config); + final Predicate isLazyWorker = createLazyWorkerPredicate(config); final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers(); final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers(); @@ -464,7 +465,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } - public Collection getWorkers(WorkerTaskRunner runner) + public Collection getWorkers(WorkerTaskRunner runner) { return runner.getWorkers(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 95919322385..af4797b19f4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -416,10 +416,7 @@ public class OverlordResource @Override public Response apply(TaskRunner taskRunner) { - if (taskRunner instanceof RemoteTaskRunner) { - // Use getZkWorkers instead of getWorkers, as they return richer details (like the list of running tasks) - return Response.ok(((RemoteTaskRunner) taskRunner).getZkWorkers()).build(); - } else if (taskRunner instanceof WorkerTaskRunner) { + if (taskRunner instanceof WorkerTaskRunner) { return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); } else { log.debug( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java index 1db1fc2aaef..d96038f0136 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import java.util.Comparator; @@ -35,16 +35,16 @@ import java.util.TreeSet; public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy { @Override - public Optional findWorkerForTask( - RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task + public Optional findWorkerForTask( + RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task ) { - final TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() + final TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() { @Override public int compare( - ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2 + ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2 ) { int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); @@ -64,7 +64,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate sortedWorkers.addAll(zkWorkers.values()); final String minWorkerVer = config.getMinWorkerVersion(); - for (ImmutableZkWorker zkWorker : sortedWorkers) { + for (ImmutableWorkerInfo zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { return Optional.of(zkWorker); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java index 295ee956f3c..df7487f7d75 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java @@ -25,7 +25,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import java.util.List; @@ -58,37 +58,37 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo } @Override - public Optional findWorkerForTask( + public Optional findWorkerForTask( final RemoteTaskRunnerConfig config, - final ImmutableMap zkWorkers, + final ImmutableMap zkWorkers, final Task task ) { // don't run other datasources on affinity workers; we only want our configured datasources to run on them - ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (String workerHost : zkWorkers.keySet()) { if (!affinityWorkerHosts.contains(workerHost)) { builder.put(workerHost, zkWorkers.get(workerHost)); } } - ImmutableMap eligibleWorkers = builder.build(); + ImmutableMap eligibleWorkers = builder.build(); List workerHosts = affinityConfig.getAffinity().get(task.getDataSource()); if (workerHosts == null) { return super.findWorkerForTask(config, eligibleWorkers, task); } - ImmutableMap.Builder affinityBuilder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder affinityBuilder = new ImmutableMap.Builder<>(); for (String workerHost : workerHosts) { - ImmutableZkWorker zkWorker = zkWorkers.get(workerHost); + ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost); if (zkWorker != null) { affinityBuilder.put(workerHost, zkWorker); } } - ImmutableMap affinityWorkers = affinityBuilder.build(); + ImmutableMap affinityWorkers = affinityBuilder.build(); if (!affinityWorkers.isEmpty()) { - Optional retVal = super.findWorkerForTask(config, affinityWorkers, task); + Optional retVal = super.findWorkerForTask(config, affinityWorkers, task); if (retVal.isPresent()) { return retVal; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 447fd985739..1116e70bf1d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import java.util.Comparator; @@ -35,18 +35,18 @@ import java.util.TreeSet; public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy { @Override - public Optional findWorkerForTask( + public Optional findWorkerForTask( final RemoteTaskRunnerConfig config, - final ImmutableMap zkWorkers, + final ImmutableMap zkWorkers, final Task task ) { - TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() + TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() { @Override public int compare( - ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2 + ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2 ) { int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); @@ -66,7 +66,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy sortedWorkers.addAll(zkWorkers.values()); final String minWorkerVer = config.getMinWorkerVersion(); - for (ImmutableZkWorker zkWorker : sortedWorkers) { + for (ImmutableWorkerInfo zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { return Optional.of(zkWorker); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java index 7e9fa3b9a48..e22f8aeb051 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java @@ -26,7 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import javax.script.Compilable; @@ -39,7 +39,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy { public static interface SelectorFunction { - public String apply(RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task); + public String apply(RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task); } private final SelectorFunction fnSelector; @@ -61,8 +61,8 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy } @Override - public Optional findWorkerForTask( - RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task + public Optional findWorkerForTask( + RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task ) { String worker = fnSelector.apply(config, zkWorkers, task); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index f6570f68b37..c9127793744 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; /** @@ -46,11 +46,11 @@ public interface WorkerSelectStrategy * @param zkWorkers An immutable map of workers to choose from. * @param task The task to assign. * - * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. + * @return A {@link io.druid.indexing.overlord.ImmutableWorkerInfo} to run the task if one is available. */ - Optional findWorkerForTask( + Optional findWorkerForTask( final RemoteTaskRunnerConfig config, - final ImmutableMap zkWorkers, + final ImmutableMap zkWorkers, final Task task ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java index 21772f0ce9b..f349399b21a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java @@ -21,9 +21,6 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.DateTime; - -import java.util.concurrent.atomic.AtomicReference; /** * A container for worker metadata. @@ -34,24 +31,19 @@ public class Worker private final String ip; private final int capacity; private final String version; - private final AtomicReference lastCompletedTaskTime; @JsonCreator public Worker( @JsonProperty("host") String host, @JsonProperty("ip") String ip, @JsonProperty("capacity") int capacity, - @JsonProperty("version") String version, - @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime + @JsonProperty("version") String version ) { this.host = host; this.ip = ip; this.capacity = capacity; this.version = version; - this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null - ? DateTime.now() - : lastCompletedTaskTime); } @JsonProperty @@ -78,22 +70,6 @@ public class Worker return version; } - @JsonProperty - public DateTime getLastCompletedTaskTime() - { - return lastCompletedTaskTime.get(); - } - - public void setLastCompletedTaskTime(DateTime completedTaskTime) - { - lastCompletedTaskTime.set(completedTaskTime); - } - - public boolean isValidVersion(final String minVersion) - { - return getVersion().compareTo(minVersion) >= 0; - } - @Override public String toString() { @@ -104,4 +80,39 @@ public class Worker ", version='" + version + '\'' + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Worker worker = (Worker) o; + + if (capacity != worker.capacity) { + return false; + } + if (!host.equals(worker.host)) { + return false; + } + if (!ip.equals(worker.ip)) { + return false; + } + return version.equals(worker.version); + + } + + @Override + public int hashCode() + { + int result = host.hashCode(); + result = 31 * result + ip.hashCode(); + result = 31 * result + capacity; + result = 31 * result + version.hashCode(); + return result; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index a83ec1ecace..9bb3bdc44b6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -80,8 +80,7 @@ public class WorkerResource enabledWorker.getHost(), enabledWorker.getIp(), enabledWorker.getCapacity(), - DISABLED_VERSION, - enabledWorker.getLastCompletedTaskTime() + DISABLED_VERSION ); curatorCoordinator.updateWorkerAnnouncement(disabledWorker); return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java new file mode 100644 index 00000000000..441b06a0cee --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; +import io.druid.indexing.worker.Worker; +import io.druid.jackson.DefaultObjectMapper; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +public class ImmutableWorkerInfoTest +{ + @Test + public void testSerde() throws Exception + { + ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo( + new Worker( + "testWorker", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ); + ObjectMapper mapper = new DefaultObjectMapper(); + final ImmutableWorkerInfo serde = mapper.readValue( + mapper.writeValueAsString(workerInfo), + ImmutableWorkerInfo.class + ); + Assert.assertEquals(workerInfo, serde); + } + + @Test + public void testEqualsAndSerde() + { + // Everything equal + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), true); + + // different worker same tasks + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker1", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker2", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), false); + + // same worker different task groups + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp3", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), false); + + // same worker different tasks + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker1", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker2", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task3"), + new DateTime("2015-01-01T01:01:01Z") + ), false); + + // same worker different capacity + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker1", "192.0.0.1", 10, "v1" + ), + 3, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker2", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), false); + + // same worker different lastCompletedTaskTime + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "testWorker1", "192.0.0.1", 10, "v1" + ), + 3, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z") + ), new ImmutableWorkerInfo( + new Worker( + "testWorker2", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:02Z") + ), false); + + } + + private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch) + { + if (shouldMatch) { + Assert.assertTrue(o1.equals(o2)); + Assert.assertEquals(o1.hashCode(), o2.hashCode()); + } else { + Assert.assertFalse(o1.equals(o2)); + Assert.assertNotEquals(o1.hashCode(), o2.hashCode()); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index d551db3dcba..3bda5f25d80 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -55,7 +55,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; -import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -359,8 +358,8 @@ public class RemoteTaskRunnerTest doSetup(); final Set existingTasks = Sets.newHashSet(); - for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) { - existingTasks.addAll(zkWorker.getRunningTasks().keySet()); + for (ImmutableWorkerInfo workerInfo : remoteTaskRunner.getWorkers()) { + existingTasks.addAll(workerInfo.getRunningTasks()); } Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks); @@ -451,7 +450,7 @@ public class RemoteTaskRunnerTest Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); // Confirm RTR thinks the worker is disabled. - Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion()); + Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion()); } private void doSetup() throws Exception @@ -492,8 +491,7 @@ public class RemoteTaskRunnerTest "worker", "localhost", 3, - "0", - DateTime.now() + "0" ); cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( @@ -506,7 +504,7 @@ public class RemoteTaskRunnerTest { cf.setData().forPath( announcementsPath, - jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now())) + jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "")) ); } @@ -578,10 +576,10 @@ public class RemoteTaskRunnerTest Assert.assertTrue(taskAnnounced(task.getId())); mockWorkerRunningTask(task); Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + new Predicate() { @Override - public boolean apply(Worker input) + public boolean apply(ImmutableWorkerInfo input) { return true; } @@ -599,10 +597,10 @@ public class RemoteTaskRunnerTest remoteTaskRunner.run(task); Assert.assertTrue(taskAnnounced(task.getId())); Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + new Predicate() { @Override - public boolean apply(Worker input) + public boolean apply(ImmutableWorkerInfo input) { return true; } @@ -618,10 +616,10 @@ public class RemoteTaskRunnerTest { doSetup(); Collection lazyworkers = remoteTaskRunner.markWorkersLazy( - new Predicate() + new Predicate() { @Override - public boolean apply(Worker input) + public boolean apply(ImmutableWorkerInfo input) { return true; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java index 4cf2fae0bc9..ce681f84e80 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategyTest.java @@ -33,6 +33,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.RemoteTaskRunner; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; @@ -119,7 +120,7 @@ public class SimpleResourceManagementStrategyTest ); EasyMock.expect(runner.getWorkers()).andReturn( Collections.singletonList( - new TestZkWorker(testTask).getWorker() + new TestZkWorker(testTask).toImmutable() ) ); EasyMock.replay(runner); @@ -155,7 +156,7 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( Collections.singletonList( - new TestZkWorker(testTask).getWorker() + new TestZkWorker(testTask).toImmutable() ) ).times(2); EasyMock.replay(runner); @@ -212,7 +213,7 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( Collections.singletonList( - new TestZkWorker(testTask).getWorker() + new TestZkWorker(testTask).toImmutable() ) ).times(2); EasyMock.replay(runner); @@ -263,14 +264,15 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( Collections.singletonList( - new TestZkWorker(testTask).getWorker() + new TestZkWorker(testTask).toImmutable() ) ).times(2); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.singletonList( - new TestZkWorker(testTask).getWorker() - ) - ); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.singletonList( + new TestZkWorker(testTask).getWorker() + ) + ); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); EasyMock.replay(runner); @@ -305,15 +307,16 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( Collections.singletonList( - new TestZkWorker(testTask).getWorker() + new TestZkWorker(testTask).toImmutable() ) ).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()).times(2); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.singletonList( - new TestZkWorker(testTask).getWorker() - ) - ); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.singletonList( + new TestZkWorker(testTask).getWorker() + ) + ); EasyMock.replay(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); @@ -354,14 +357,15 @@ public class SimpleResourceManagementStrategyTest ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( Arrays.asList( - new TestZkWorker(NoopTask.create()).getWorker(), - new TestZkWorker(NoopTask.create()).getWorker() + new TestZkWorker(NoopTask.create()).toImmutable(), + new TestZkWorker(NoopTask.create()).toImmutable() ) ).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.emptyList() - ); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.emptyList() + ); EasyMock.replay(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(runner); @@ -398,14 +402,15 @@ public class SimpleResourceManagementStrategyTest Collections.emptyList() ).times(3); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker() + Collections.singletonList( + new TestZkWorker(NoopTask.create(), "h1", "i1", "0").toImmutable() ) ).times(3); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.newArrayList()); - EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())).andReturn( - Collections.emptyList() - ); + EasyMock.expect(runner.markWorkersLazy(EasyMock.>anyObject(), EasyMock.anyInt())) + .andReturn( + Collections.emptyList() + ); EasyMock.replay(runner); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( @@ -462,8 +467,8 @@ public class SimpleResourceManagementStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( - new TestZkWorker(null).getWorker() + Collections.singletonList( + new TestZkWorker(null).toImmutable() ) ).times(1); EasyMock.replay(runner); @@ -501,7 +506,7 @@ public class SimpleResourceManagementStrategyTest String version ) { - super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper()); + super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper()); this.testTask = testTask; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index ec64c8b505f..e53dc25d27a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -23,7 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.druid.indexing.common.task.NoopTask; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -38,18 +38,22 @@ public class EqualDistributionWorkerSelectStrategyTest { final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "lhost", - new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("lhost", "lhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), "localhost", - new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 1, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) @@ -61,7 +65,7 @@ public class EqualDistributionWorkerSelectStrategyTest } } ); - ImmutableZkWorker worker = optional.get(); + ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("lhost", worker.getWorker().getHost()); } @@ -71,18 +75,22 @@ public class EqualDistributionWorkerSelectStrategyTest String DISABLED_VERSION = ""; final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "lhost", - new ImmutableZkWorker( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), "localhost", - new ImmutableZkWorker( - new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) @@ -94,7 +102,7 @@ public class EqualDistributionWorkerSelectStrategyTest } } ); - ImmutableZkWorker worker = optional.get(); + ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("enableHost", worker.getWorker().getHost()); } @@ -104,18 +112,22 @@ public class EqualDistributionWorkerSelectStrategyTest String DISABLED_VERSION = ""; final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "lhost", - new ImmutableZkWorker( - new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), "localhost", - new ImmutableZkWorker( - new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) @@ -127,7 +139,7 @@ public class EqualDistributionWorkerSelectStrategyTest } } ); - ImmutableZkWorker worker = optional.get(); + ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("enableHost", worker.getWorker().getHost()); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java index 99a9571b553..bd928d73b48 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java @@ -23,7 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.druid.indexing.common.task.NoopTask; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -41,18 +41,22 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "lhost", - new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("lhost", "lhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), "localhost", - new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) @@ -64,7 +68,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest } } ); - ImmutableZkWorker worker = optional.get(); + ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("localhost", worker.getWorker().getHost()); } @@ -75,23 +79,27 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "lhost", - new ImmutableZkWorker( - new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("lhost", "lhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ), "localhost", - new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) ); - ImmutableZkWorker worker = optional.get(); + ImmutableWorkerInfo worker = optional.get(); Assert.assertEquals("lhost", worker.getWorker().getHost()); } @@ -102,13 +110,15 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost"))) ); - Optional optional = strategy.findWorkerForTask( + Optional optional = strategy.findWorkerForTask( new RemoteTaskRunnerConfig(), ImmutableMap.of( "localhost", - new ImmutableZkWorker( - new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0, - Sets.newHashSet() + new ImmutableWorkerInfo( + new Worker("localhost", "localhost", 1, "v1"), 0, + Sets.newHashSet(), + Sets.newHashSet(), + DateTime.now() ) ), new NoopTask(null, 1, 0, null, null, null) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java index 0daf3a1f6a6..745749cbd68 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ImmutableWorkerInfo; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.jackson.DefaultObjectMapper; import org.easymock.EasyMock; @@ -79,14 +79,14 @@ public class JavaScriptWorkerSelectStrategyTest @Test public void testFindWorkerForTask() { - ImmutableZkWorker worker1 = createMockWorker(1, true, true); - ImmutableZkWorker worker2 = createMockWorker(1, true, true); - ImmutableMap workerMap = ImmutableMap.of( + ImmutableWorkerInfo worker1 = createMockWorker(1, true, true); + ImmutableWorkerInfo worker2 = createMockWorker(1, true, true); + ImmutableMap workerMap = ImmutableMap.of( "10.0.0.1", worker1, "10.0.0.3", worker2 ); - ImmutableZkWorker workerForBatchTask = strategy.findWorkerForTask( + ImmutableWorkerInfo workerForBatchTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("index_hadoop") @@ -94,7 +94,7 @@ public class JavaScriptWorkerSelectStrategyTest // batch tasks should be sent to worker1 Assert.assertEquals(worker1, workerForBatchTask); - ImmutableZkWorker workerForOtherTask = strategy.findWorkerForTask( + ImmutableWorkerInfo workerForOtherTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("other_type") @@ -106,11 +106,11 @@ public class JavaScriptWorkerSelectStrategyTest @Test public void testIsolationOfBatchWorker() { - ImmutableMap workerMap = ImmutableMap.of( + ImmutableMap workerMap = ImmutableMap.of( "10.0.0.1", createMockWorker(1, true, true), "10.0.0.2", createMockWorker(1, true, true) ); - Optional workerForOtherTask = strategy.findWorkerForTask( + Optional workerForOtherTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("other_type") @@ -121,18 +121,18 @@ public class JavaScriptWorkerSelectStrategyTest @Test public void testNoValidWorker() { - ImmutableMap workerMap = ImmutableMap.of( + ImmutableMap workerMap = ImmutableMap.of( "10.0.0.1", createMockWorker(1, true, false), "10.0.0.4", createMockWorker(1, true, false) ); - Optional workerForBatchTask = strategy.findWorkerForTask( + Optional workerForBatchTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("index_hadoop") ); Assert.assertFalse(workerForBatchTask.isPresent()); - Optional workerForOtherTask = strategy.findWorkerForTask( + Optional workerForOtherTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("otherTask") @@ -144,18 +144,18 @@ public class JavaScriptWorkerSelectStrategyTest @Test public void testNoWorkerCanRunTask() { - ImmutableMap workerMap = ImmutableMap.of( + ImmutableMap workerMap = ImmutableMap.of( "10.0.0.1", createMockWorker(1, false, true), "10.0.0.4", createMockWorker(1, false, true) ); - Optional workerForBatchTask = strategy.findWorkerForTask( + Optional workerForBatchTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("index_hadoop") ); Assert.assertFalse(workerForBatchTask.isPresent()); - Optional workerForOtherTask = strategy.findWorkerForTask( + Optional workerForOtherTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("otherTask") @@ -168,11 +168,11 @@ public class JavaScriptWorkerSelectStrategyTest public void testFillWorkerCapacity() { // tasks shoudl be assigned to the worker with maximum currCapacity used until its full - ImmutableMap workerMap = ImmutableMap.of( + ImmutableMap workerMap = ImmutableMap.of( "10.0.0.1", createMockWorker(1, true, true), "10.0.0.2", createMockWorker(5, true, true) ); - Optional workerForBatchTask = strategy.findWorkerForTask( + Optional workerForBatchTask = strategy.findWorkerForTask( new TestRemoteTaskRunnerConfig(new Period("PT1S")), workerMap, createMockTask("index_hadoop") @@ -189,9 +189,9 @@ public class JavaScriptWorkerSelectStrategyTest return mock; } - private ImmutableZkWorker createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion) + private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion) { - ImmutableZkWorker worker = EasyMock.createMock(ImmutableZkWorker.class); + ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class); EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes(); EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes(); EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes(); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index bfe2264926a..d8da4b95523 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -114,8 +114,7 @@ public class WorkerTaskMonitorTest "worker", "localhost", 3, - "0", - DateTime.now() + "0" ); workerCuratorCoordinator = new WorkerCuratorCoordinator( diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index acd05a07241..9aa7a2c423b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -74,8 +74,7 @@ public class WorkerResourceTest "host", "ip", 3, - "v1", - DateTime.now() + "v1" ); curatorCoordinator = new WorkerCuratorCoordinator( diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 03e7e013a4e..a376d1f7a69 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -107,8 +107,7 @@ public class CliMiddleManager extends ServerRunnable node.getHostAndPort(), config.getIp(), config.getCapacity(), - config.getVersion(), - DateTime.now() + config.getVersion() ); } },