Merge pull request #2588 from metamx/abstract-zkworker

Use ImmutableWorkerInfo instead of ZKWorker
This commit is contained in:
Charles Allen 2016-03-15 10:27:21 -07:00
commit 32399bc711
23 changed files with 593 additions and 284 deletions

View File

@ -0,0 +1,150 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import java.util.Collection;
import java.util.Set;
/**
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
*/
public class ImmutableWorkerInfo
{
private final Worker worker;
private final int currCapacityUsed;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
@JsonCreator
public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
)
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
}
@JsonProperty("worker")
public Worker getWorker()
{
return worker;
}
@JsonProperty("currCapacityUsed")
public int getCurrCapacityUsed()
{
return currCapacityUsed;
}
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
}
@JsonProperty("runningTasks")
public Set<String> getRunningTasks()
{
return runningTasks;
}
@JsonProperty("lastCompletedTaskTime")
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}
public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ImmutableWorkerInfo that = (ImmutableWorkerInfo) o;
if (currCapacityUsed != that.currCapacityUsed) {
return false;
}
if (!worker.equals(that.worker)) {
return false;
}
if (!availabilityGroups.equals(that.availabilityGroups)) {
return false;
}
if (!runningTasks.equals(that.runningTasks)) {
return false;
}
return lastCompletedTaskTime.equals(that.lastCompletedTaskTime);
}
@Override
public int hashCode()
{
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
return result;
}
@Override
public String toString()
{
return "ImmutableWorkerInfo{" +
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import java.util.Set;
/**
* A snapshot of a {@link io.druid.indexing.overlord.ZkWorker}
*/
public class ImmutableZkWorker
{
private final Worker worker;
private final int currCapacityUsed;
private final ImmutableSet<String> availabilityGroups;
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
}
public Worker getWorker()
{
return worker;
}
public int getCurrCapacityUsed()
{
return currCapacityUsed;
}
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
}
public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}
}

View File

@ -107,13 +107,13 @@ import java.util.concurrent.TimeUnit;
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
* <p/>
* <p>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* <p/>
* <p>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/>
* <p>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@ -365,14 +365,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Collection<Worker> getWorkers()
public Collection<ImmutableWorkerInfo> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}
public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
return getImmutableWorkerFromZK(zkWorkers.values());
}
@Override
@ -672,7 +667,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
ZkWorker assignedWorker = null;
try {
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
final Optional<ImmutableWorkerInfo> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
@ -687,10 +682,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableZkWorker transformEntry(
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
@ -712,7 +707,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
} finally {
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
// note that this is essential as a task might not get a worker because a worker was assigned another task.
@ -1092,7 +1088,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
@ -1101,7 +1097,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
@ -1146,6 +1142,23 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
{
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo apply(ZkWorker input)
{
return input.toImmutable();
}
}
)
);
}
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(

View File

@ -29,7 +29,7 @@ public interface WorkerTaskRunner extends TaskRunner
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();
Collection<ImmutableWorkerInfo> getWorkers();
/**
* Return a list of workers who can be reaped by autoscaling
@ -43,5 +43,5 @@ public interface WorkerTaskRunner extends TaskRunner
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
}

View File

@ -50,6 +50,7 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
@ -128,7 +129,7 @@ public class ZkWorker implements Closeable
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return worker.get().getLastCompletedTaskTime();
return lastCompletedTaskTime.get();
}
public boolean isRunningTask(String taskId)
@ -138,7 +139,7 @@ public class ZkWorker implements Closeable
public boolean isValidVersion(String minVersion)
{
return worker.get().isValidVersion(minVersion);
return worker.get().getVersion().compareTo(minVersion) >= 0;
}
public void setWorker(Worker newWorker)
@ -152,12 +153,13 @@ public class ZkWorker implements Closeable
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
worker.get().setLastCompletedTaskTime(completedTaskTime);
lastCompletedTaskTime.set(completedTaskTime);
}
public ImmutableZkWorker toImmutable()
public ImmutableWorkerInfo toImmutable()
{
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get());
}
@Override
@ -171,6 +173,7 @@ public class ZkWorker implements Closeable
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}

View File

@ -34,8 +34,9 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
@ -103,7 +104,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
boolean doProvision(WorkerTaskRunner runner)
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
Collection<Worker> workers = getWorkers(runner);
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@ -111,19 +112,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workers,
new Function<Worker, String>()
new Function<ImmutableWorkerInfo, String>()
{
@Override
public String apply(Worker input)
public String apply(ImmutableWorkerInfo input)
{
return input.getIp();
return input.getWorker().getIp();
}
}
)
@ -206,14 +207,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
Collection<Worker> workers = getWorkers(runner);
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
final Collection<String> laziestWorkerIps =
Collections2.transform(
runner.markWorkersLazy(isLazyWorker, excessWorkers),
@ -334,16 +335,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private static Predicate<Worker> createLazyWorkerPredicate(
private static Predicate<ImmutableWorkerInfo> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<Worker>()
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(Worker worker)
public boolean apply(ImmutableWorkerInfo worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
@ -352,14 +353,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
};
}
private static Predicate<Worker> createValidWorkerPredicate(
private static Predicate<ImmutableWorkerInfo> createValidWorkerPredicate(
final SimpleResourceManagementConfig config
)
{
return new Predicate<Worker>()
return new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(Worker worker)
public boolean apply(ImmutableWorkerInfo worker)
{
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
@ -373,15 +374,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private void updateTargetWorkerCount(
final WorkerBehaviorConfig workerConfig,
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
final Collection<Worker> zkWorkers
final Collection<ImmutableWorkerInfo> zkWorkers
)
{
synchronized (lock) {
final Collection<Worker> validWorkers = Collections2.filter(
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config)
);
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
@ -464,7 +465,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
}
public Collection<Worker> getWorkers(WorkerTaskRunner runner)
public Collection<ImmutableWorkerInfo> getWorkers(WorkerTaskRunner runner)
{
return runner.getWorkers();
}

View File

@ -416,10 +416,7 @@ public class OverlordResource
@Override
public Response apply(TaskRunner taskRunner)
{
if (taskRunner instanceof RemoteTaskRunner) {
// Use getZkWorkers instead of getWorkers, as they return richer details (like the list of running tasks)
return Response.ok(((RemoteTaskRunner) taskRunner).getZkWorkers()).build();
} else if (taskRunner instanceof WorkerTaskRunner) {
if (taskRunner instanceof WorkerTaskRunner) {
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
} else {
log.debug(

View File

@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.Comparator;
@ -35,16 +35,16 @@ import java.util.TreeSet;
public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy
{
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
public Optional<ImmutableWorkerInfo> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
final TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableZkWorker>()
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableWorkerInfo>()
{
@Override
public int compare(
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
)
{
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
@ -64,7 +64,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
for (ImmutableZkWorker zkWorker : sortedWorkers) {
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}

View File

@ -25,7 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.List;
@ -58,37 +58,37 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>();
ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build();
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();
if (!affinityWorkers.isEmpty()) {
Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(config, affinityWorkers, task);
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}

View File

@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.Comparator;
@ -35,18 +35,18 @@ import java.util.TreeSet;
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableZkWorker>()
TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableWorkerInfo>()
{
@Override
public int compare(
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
@ -66,7 +66,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
for (ImmutableZkWorker zkWorker : sortedWorkers) {
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}

View File

@ -26,7 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import javax.script.Compilable;
@ -39,7 +39,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
{
public static interface SelectorFunction
{
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task);
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task);
}
private final SelectorFunction fnSelector;
@ -61,8 +61,8 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
public Optional<ImmutableWorkerInfo> findWorkerForTask(
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
)
{
String worker = fnSelector.apply(config, zkWorkers, task);

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
/**
@ -46,11 +46,11 @@ public interface WorkerSelectStrategy
* @param zkWorkers An immutable map of workers to choose from.
* @param task The task to assign.
*
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
* @return A {@link io.druid.indexing.overlord.ImmutableWorkerInfo} to run the task if one is available.
*/
Optional<ImmutableZkWorker> findWorkerForTask(
Optional<ImmutableWorkerInfo> findWorkerForTask(
final RemoteTaskRunnerConfig config,
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
);
}

View File

@ -21,9 +21,6 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicReference;
/**
* A container for worker metadata.
@ -34,24 +31,19 @@ public class Worker
private final String ip;
private final int capacity;
private final String version;
private final AtomicReference<DateTime> lastCompletedTaskTime;
@JsonCreator
public Worker(
@JsonProperty("host") String host,
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
@JsonProperty("version") String version
)
{
this.host = host;
this.ip = ip;
this.capacity = capacity;
this.version = version;
this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null
? DateTime.now()
: lastCompletedTaskTime);
}
@JsonProperty
@ -78,22 +70,6 @@ public class Worker
return version;
}
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
}
public boolean isValidVersion(final String minVersion)
{
return getVersion().compareTo(minVersion) >= 0;
}
@Override
public String toString()
{
@ -104,4 +80,39 @@ public class Worker
", version='" + version + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Worker worker = (Worker) o;
if (capacity != worker.capacity) {
return false;
}
if (!host.equals(worker.host)) {
return false;
}
if (!ip.equals(worker.ip)) {
return false;
}
return version.equals(worker.version);
}
@Override
public int hashCode()
{
int result = host.hashCode();
result = 31 * result + ip.hashCode();
result = 31 * result + capacity;
result = 31 * result + version.hashCode();
return result;
}
}

View File

@ -80,8 +80,7 @@ public class WorkerResource
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getLastCompletedTaskTime()
DISABLED_VERSION
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();

View File

@ -0,0 +1,182 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
public class ImmutableWorkerInfoTest
{
@Test
public void testSerde() throws Exception
{
ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
new Worker(
"testWorker", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
);
ObjectMapper mapper = new DefaultObjectMapper();
final ImmutableWorkerInfo serde = mapper.readValue(
mapper.writeValueAsString(workerInfo),
ImmutableWorkerInfo.class
);
Assert.assertEquals(workerInfo, serde);
}
@Test
public void testEqualsAndSerde()
{
// Everything equal
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), true);
// different worker same tasks
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker1", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker2", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), false);
// same worker different task groups
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp3", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), false);
// same worker different tasks
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker1", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker2", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task3"),
new DateTime("2015-01-01T01:01:01Z")
), false);
// same worker different capacity
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker1", "192.0.0.1", 10, "v1"
),
3,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker2", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), false);
// same worker different lastCompletedTaskTime
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"testWorker1", "192.0.0.1", 10, "v1"
),
3,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z")
), new ImmutableWorkerInfo(
new Worker(
"testWorker2", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:02Z")
), false);
}
private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)
{
if (shouldMatch) {
Assert.assertTrue(o1.equals(o2));
Assert.assertEquals(o1.hashCode(), o2.hashCode());
} else {
Assert.assertFalse(o1.equals(o2));
Assert.assertNotEquals(o1.hashCode(), o2.hashCode());
}
}
}

View File

@ -55,7 +55,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@ -359,8 +358,8 @@ public class RemoteTaskRunnerTest
doSetup();
final Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
for (ImmutableWorkerInfo workerInfo : remoteTaskRunner.getWorkers()) {
existingTasks.addAll(workerInfo.getRunningTasks());
}
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
@ -451,7 +450,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion());
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
}
private void doSetup() throws Exception
@ -492,8 +491,7 @@ public class RemoteTaskRunnerTest
"worker",
"localhost",
3,
"0",
DateTime.now()
"0"
);
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
@ -506,7 +504,7 @@ public class RemoteTaskRunnerTest
{
cf.setData().forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now()))
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
);
}
@ -578,10 +576,10 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(Worker input)
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
@ -599,10 +597,10 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(Worker input)
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
@ -618,10 +616,10 @@ public class RemoteTaskRunnerTest
{
doSetup();
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<Worker>()
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(Worker input)
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}

View File

@ -33,6 +33,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
@ -119,7 +120,7 @@ public class SimpleResourceManagementStrategyTest
);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
new TestZkWorker(testTask).toImmutable()
)
);
EasyMock.replay(runner);
@ -155,7 +156,7 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.replay(runner);
@ -212,7 +213,7 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.replay(runner);
@ -263,10 +264,11 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>singletonList(
new TestZkWorker(testTask).getWorker()
)
@ -305,11 +307,12 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
@ -354,12 +357,13 @@ public class SimpleResourceManagementStrategyTest
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Arrays.asList(
new TestZkWorker(NoopTask.create()).getWorker(),
new TestZkWorker(NoopTask.create()).getWorker()
new TestZkWorker(NoopTask.create()).toImmutable(),
new TestZkWorker(NoopTask.create()).toImmutable()
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
@ -398,12 +402,13 @@ public class SimpleResourceManagementStrategyTest
Collections.<RemoteTaskRunnerWorkItem>emptyList()
).times(3);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<Worker>singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker()
Collections.singletonList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0").toImmutable()
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.replay(runner);
@ -462,8 +467,8 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
Collections.<Worker>singletonList(
new TestZkWorker(null).getWorker()
Collections.singletonList(
new TestZkWorker(null).toImmutable()
)
).times(1);
EasyMock.replay(runner);
@ -501,7 +506,7 @@ public class SimpleResourceManagementStrategyTest
String version
)
{
super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper());
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
this.testTask = testTask;
}

View File

@ -23,7 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
@ -38,18 +38,22 @@ public class EqualDistributionWorkerSelectStrategyTest
{
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
@ -61,7 +65,7 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableZkWorker worker = optional.get();
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -71,18 +75,22 @@ public class EqualDistributionWorkerSelectStrategyTest
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
@ -94,7 +102,7 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableZkWorker worker = optional.get();
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@ -104,18 +112,22 @@ public class EqualDistributionWorkerSelectStrategyTest
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
@ -127,7 +139,7 @@ public class EqualDistributionWorkerSelectStrategyTest
}
}
);
ImmutableZkWorker worker = optional.get();
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
}

View File

@ -23,7 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
@ -41,18 +41,22 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
@ -64,7 +68,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
}
}
);
ImmutableZkWorker worker = optional.get();
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@ -75,23 +79,27 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableZkWorker worker = optional.get();
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -102,13 +110,15 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
Sets.<String>newHashSet()
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
@ -79,14 +79,14 @@ public class JavaScriptWorkerSelectStrategyTest
@Test
public void testFindWorkerForTask()
{
ImmutableZkWorker worker1 = createMockWorker(1, true, true);
ImmutableZkWorker worker2 = createMockWorker(1, true, true);
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
ImmutableWorkerInfo worker1 = createMockWorker(1, true, true);
ImmutableWorkerInfo worker2 = createMockWorker(1, true, true);
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
"10.0.0.1", worker1,
"10.0.0.3", worker2
);
ImmutableZkWorker workerForBatchTask = strategy.findWorkerForTask(
ImmutableWorkerInfo workerForBatchTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
@ -94,7 +94,7 @@ public class JavaScriptWorkerSelectStrategyTest
// batch tasks should be sent to worker1
Assert.assertEquals(worker1, workerForBatchTask);
ImmutableZkWorker workerForOtherTask = strategy.findWorkerForTask(
ImmutableWorkerInfo workerForOtherTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("other_type")
@ -106,11 +106,11 @@ public class JavaScriptWorkerSelectStrategyTest
@Test
public void testIsolationOfBatchWorker()
{
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
"10.0.0.1", createMockWorker(1, true, true),
"10.0.0.2", createMockWorker(1, true, true)
);
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("other_type")
@ -121,18 +121,18 @@ public class JavaScriptWorkerSelectStrategyTest
@Test
public void testNoValidWorker()
{
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
"10.0.0.1", createMockWorker(1, true, false),
"10.0.0.4", createMockWorker(1, true, false)
);
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
);
Assert.assertFalse(workerForBatchTask.isPresent());
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("otherTask")
@ -144,18 +144,18 @@ public class JavaScriptWorkerSelectStrategyTest
@Test
public void testNoWorkerCanRunTask()
{
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
"10.0.0.1", createMockWorker(1, false, true),
"10.0.0.4", createMockWorker(1, false, true)
);
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
);
Assert.assertFalse(workerForBatchTask.isPresent());
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("otherTask")
@ -168,11 +168,11 @@ public class JavaScriptWorkerSelectStrategyTest
public void testFillWorkerCapacity()
{
// tasks shoudl be assigned to the worker with maximum currCapacity used until its full
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
"10.0.0.1", createMockWorker(1, true, true),
"10.0.0.2", createMockWorker(5, true, true)
);
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
workerMap,
createMockTask("index_hadoop")
@ -189,9 +189,9 @@ public class JavaScriptWorkerSelectStrategyTest
return mock;
}
private ImmutableZkWorker createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
{
ImmutableZkWorker worker = EasyMock.createMock(ImmutableZkWorker.class);
ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class);
EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();

View File

@ -114,8 +114,7 @@ public class WorkerTaskMonitorTest
"worker",
"localhost",
3,
"0",
DateTime.now()
"0"
);
workerCuratorCoordinator = new WorkerCuratorCoordinator(

View File

@ -74,8 +74,7 @@ public class WorkerResourceTest
"host",
"ip",
3,
"v1",
DateTime.now()
"v1"
);
curatorCoordinator = new WorkerCuratorCoordinator(

View File

@ -107,8 +107,7 @@ public class CliMiddleManager extends ServerRunnable
node.getHostAndPort(),
config.getIp(),
config.getCapacity(),
config.getVersion(),
DateTime.now()
config.getVersion()
);
}
},