mirror of
https://github.com/apache/druid.git
synced 2025-02-09 03:24:55 +00:00
Use ImmutableWorkerInfo instead of ZKWorker
review comments add test for equals and hashcode
This commit is contained in:
parent
d51a0a0cf4
commit
9cceff2274
@ -0,0 +1,150 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.indexing.overlord;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import io.druid.indexing.common.task.Task;
|
||||||
|
import io.druid.indexing.worker.Worker;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
|
||||||
|
*/
|
||||||
|
public class ImmutableWorkerInfo
|
||||||
|
{
|
||||||
|
private final Worker worker;
|
||||||
|
private final int currCapacityUsed;
|
||||||
|
private final ImmutableSet<String> availabilityGroups;
|
||||||
|
private final ImmutableSet<String> runningTasks;
|
||||||
|
private final DateTime lastCompletedTaskTime;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public ImmutableWorkerInfo(
|
||||||
|
@JsonProperty("worker") Worker worker,
|
||||||
|
@JsonProperty("currCapacityUsed") int currCapacityUsed,
|
||||||
|
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
|
||||||
|
@JsonProperty("runningTasks") Collection<String> runningTasks,
|
||||||
|
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.worker = worker;
|
||||||
|
this.currCapacityUsed = currCapacityUsed;
|
||||||
|
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
|
||||||
|
this.runningTasks = ImmutableSet.copyOf(runningTasks);
|
||||||
|
this.lastCompletedTaskTime = lastCompletedTaskTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("worker")
|
||||||
|
public Worker getWorker()
|
||||||
|
{
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("currCapacityUsed")
|
||||||
|
public int getCurrCapacityUsed()
|
||||||
|
{
|
||||||
|
return currCapacityUsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("availabilityGroups")
|
||||||
|
public Set<String> getAvailabilityGroups()
|
||||||
|
{
|
||||||
|
return availabilityGroups;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("runningTasks")
|
||||||
|
public Set<String> getRunningTasks()
|
||||||
|
{
|
||||||
|
return runningTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty("lastCompletedTaskTime")
|
||||||
|
public DateTime getLastCompletedTaskTime()
|
||||||
|
{
|
||||||
|
return lastCompletedTaskTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValidVersion(String minVersion)
|
||||||
|
{
|
||||||
|
return worker.getVersion().compareTo(minVersion) >= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean canRunTask(Task task)
|
||||||
|
{
|
||||||
|
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
|
||||||
|
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableWorkerInfo that = (ImmutableWorkerInfo) o;
|
||||||
|
|
||||||
|
if (currCapacityUsed != that.currCapacityUsed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!worker.equals(that.worker)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!availabilityGroups.equals(that.availabilityGroups)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!runningTasks.equals(that.runningTasks)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return lastCompletedTaskTime.equals(that.lastCompletedTaskTime);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = worker.hashCode();
|
||||||
|
result = 31 * result + currCapacityUsed;
|
||||||
|
result = 31 * result + availabilityGroups.hashCode();
|
||||||
|
result = 31 * result + runningTasks.hashCode();
|
||||||
|
result = 31 * result + lastCompletedTaskTime.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "ImmutableWorkerInfo{" +
|
||||||
|
"worker=" + worker +
|
||||||
|
", currCapacityUsed=" + currCapacityUsed +
|
||||||
|
", availabilityGroups=" + availabilityGroups +
|
||||||
|
", runningTasks=" + runningTasks +
|
||||||
|
", lastCompletedTaskTime=" + lastCompletedTaskTime +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
@ -1,69 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. Metamarkets licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.indexing.overlord;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import io.druid.indexing.common.task.Task;
|
|
||||||
import io.druid.indexing.worker.Worker;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A snapshot of a {@link io.druid.indexing.overlord.ZkWorker}
|
|
||||||
*/
|
|
||||||
public class ImmutableZkWorker
|
|
||||||
{
|
|
||||||
private final Worker worker;
|
|
||||||
private final int currCapacityUsed;
|
|
||||||
private final ImmutableSet<String> availabilityGroups;
|
|
||||||
|
|
||||||
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
|
|
||||||
{
|
|
||||||
this.worker = worker;
|
|
||||||
this.currCapacityUsed = currCapacityUsed;
|
|
||||||
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Worker getWorker()
|
|
||||||
{
|
|
||||||
return worker;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getCurrCapacityUsed()
|
|
||||||
{
|
|
||||||
return currCapacityUsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<String> getAvailabilityGroups()
|
|
||||||
{
|
|
||||||
return availabilityGroups;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isValidVersion(String minVersion)
|
|
||||||
{
|
|
||||||
return worker.getVersion().compareTo(minVersion) >= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean canRunTask(Task task)
|
|
||||||
{
|
|
||||||
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
|
|
||||||
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
|
|
||||||
}
|
|
||||||
}
|
|
@ -107,13 +107,13 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
|
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
|
||||||
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
|
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
|
||||||
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
|
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
|
||||||
* <p/>
|
* <p>
|
||||||
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
|
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
|
||||||
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
|
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
|
||||||
* <p/>
|
* <p>
|
||||||
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
|
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
|
||||||
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
|
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
|
||||||
* <p/>
|
* <p>
|
||||||
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
|
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
|
||||||
*/
|
*/
|
||||||
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
@ -365,14 +365,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<Worker> getWorkers()
|
public Collection<ImmutableWorkerInfo> getWorkers()
|
||||||
{
|
{
|
||||||
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
|
return getImmutableWorkerFromZK(zkWorkers.values());
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<ZkWorker> getZkWorkers()
|
|
||||||
{
|
|
||||||
return ImmutableList.copyOf(zkWorkers.values());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -672,7 +667,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
|
|
||||||
ZkWorker assignedWorker = null;
|
ZkWorker assignedWorker = null;
|
||||||
try {
|
try {
|
||||||
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
|
final Optional<ImmutableWorkerInfo> immutableZkWorker = strategy.findWorkerForTask(
|
||||||
config,
|
config,
|
||||||
ImmutableMap.copyOf(
|
ImmutableMap.copyOf(
|
||||||
Maps.transformEntries(
|
Maps.transformEntries(
|
||||||
@ -687,10 +682,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
|
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public ImmutableZkWorker transformEntry(
|
public ImmutableWorkerInfo transformEntry(
|
||||||
String key, ZkWorker value
|
String key, ZkWorker value
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@ -712,7 +707,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
|
|
||||||
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
|
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
|
||||||
return false;
|
return false;
|
||||||
} finally {
|
}
|
||||||
|
finally {
|
||||||
if (assignedWorker != null) {
|
if (assignedWorker != null) {
|
||||||
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
|
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
|
||||||
// note that this is essential as a task might not get a worker because a worker was assigned another task.
|
// note that this is essential as a task might not get a worker because a worker was assigned another task.
|
||||||
@ -1092,7 +1088,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
|
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
|
||||||
{
|
{
|
||||||
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
|
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
|
||||||
synchronized (statusLock) {
|
synchronized (statusLock) {
|
||||||
@ -1101,7 +1097,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
String worker = iterator.next();
|
String worker = iterator.next();
|
||||||
ZkWorker zkWorker = zkWorkers.get(worker);
|
ZkWorker zkWorker = zkWorkers.get(worker);
|
||||||
try {
|
try {
|
||||||
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
|
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
|
||||||
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
|
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
|
||||||
lazyWorkers.put(worker, zkWorker);
|
lazyWorkers.put(worker, zkWorker);
|
||||||
if (lazyWorkers.size() == maxWorkers) {
|
if (lazyWorkers.size() == maxWorkers) {
|
||||||
@ -1146,6 +1142,23 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||||||
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
|
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
|
||||||
|
{
|
||||||
|
return ImmutableList.copyOf(
|
||||||
|
Collections2.transform(
|
||||||
|
workers,
|
||||||
|
new Function<ZkWorker, ImmutableWorkerInfo>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ImmutableWorkerInfo apply(ZkWorker input)
|
||||||
|
{
|
||||||
|
return input.toImmutable();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
|
public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
|
||||||
{
|
{
|
||||||
return Collections2.transform(
|
return Collections2.transform(
|
||||||
|
@ -29,7 +29,7 @@ public interface WorkerTaskRunner extends TaskRunner
|
|||||||
* List of known workers who can accept tasks
|
* List of known workers who can accept tasks
|
||||||
* @return A list of workers who can accept tasks for running
|
* @return A list of workers who can accept tasks for running
|
||||||
*/
|
*/
|
||||||
Collection<Worker> getWorkers();
|
Collection<ImmutableWorkerInfo> getWorkers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a list of workers who can be reaped by autoscaling
|
* Return a list of workers who can be reaped by autoscaling
|
||||||
@ -43,5 +43,5 @@ public interface WorkerTaskRunner extends TaskRunner
|
|||||||
* @param maxWorkers
|
* @param maxWorkers
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
|
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,7 @@ public class ZkWorker implements Closeable
|
|||||||
private final Function<ChildData, TaskAnnouncement> cacheConverter;
|
private final Function<ChildData, TaskAnnouncement> cacheConverter;
|
||||||
|
|
||||||
private AtomicReference<Worker> worker;
|
private AtomicReference<Worker> worker;
|
||||||
|
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
|
||||||
|
|
||||||
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
||||||
{
|
{
|
||||||
@ -128,7 +129,7 @@ public class ZkWorker implements Closeable
|
|||||||
@JsonProperty
|
@JsonProperty
|
||||||
public DateTime getLastCompletedTaskTime()
|
public DateTime getLastCompletedTaskTime()
|
||||||
{
|
{
|
||||||
return worker.get().getLastCompletedTaskTime();
|
return lastCompletedTaskTime.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRunningTask(String taskId)
|
public boolean isRunningTask(String taskId)
|
||||||
@ -138,7 +139,7 @@ public class ZkWorker implements Closeable
|
|||||||
|
|
||||||
public boolean isValidVersion(String minVersion)
|
public boolean isValidVersion(String minVersion)
|
||||||
{
|
{
|
||||||
return worker.get().isValidVersion(minVersion);
|
return worker.get().getVersion().compareTo(minVersion) >= 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setWorker(Worker newWorker)
|
public void setWorker(Worker newWorker)
|
||||||
@ -152,12 +153,13 @@ public class ZkWorker implements Closeable
|
|||||||
|
|
||||||
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
||||||
{
|
{
|
||||||
worker.get().setLastCompletedTaskTime(completedTaskTime);
|
lastCompletedTaskTime.set(completedTaskTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ImmutableZkWorker toImmutable()
|
public ImmutableWorkerInfo toImmutable()
|
||||||
{
|
{
|
||||||
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
|
|
||||||
|
return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -171,6 +173,7 @@ public class ZkWorker implements Closeable
|
|||||||
{
|
{
|
||||||
return "ZkWorker{" +
|
return "ZkWorker{" +
|
||||||
"worker=" + worker +
|
"worker=" + worker +
|
||||||
|
", lastCompletedTaskTime=" + lastCompletedTaskTime +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,8 +34,9 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
|||||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.granularity.PeriodGranularity;
|
import io.druid.granularity.PeriodGranularity;
|
||||||
import io.druid.indexing.overlord.WorkerTaskRunner;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
||||||
|
import io.druid.indexing.overlord.WorkerTaskRunner;
|
||||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
@ -103,7 +104,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
boolean doProvision(WorkerTaskRunner runner)
|
boolean doProvision(WorkerTaskRunner runner)
|
||||||
{
|
{
|
||||||
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
||||||
Collection<Worker> workers = getWorkers(runner);
|
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
boolean didProvision = false;
|
boolean didProvision = false;
|
||||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
||||||
@ -111,19 +112,19 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
log.warn("No workerConfig available, cannot provision new workers.");
|
log.warn("No workerConfig available, cannot provision new workers.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
|
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
|
||||||
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
|
final int currValidWorkers = Collections2.filter(workers, isValidWorker).size();
|
||||||
|
|
||||||
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
|
final List<String> workerNodeIds = workerConfig.getAutoScaler().ipToIdLookup(
|
||||||
Lists.newArrayList(
|
Lists.newArrayList(
|
||||||
Iterables.transform(
|
Iterables.transform(
|
||||||
workers,
|
workers,
|
||||||
new Function<Worker, String>()
|
new Function<ImmutableWorkerInfo, String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String apply(Worker input)
|
public String apply(ImmutableWorkerInfo input)
|
||||||
{
|
{
|
||||||
return input.getIp();
|
return input.getWorker().getIp();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -206,14 +207,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
currentlyTerminating.clear();
|
currentlyTerminating.clear();
|
||||||
currentlyTerminating.addAll(stillExisting);
|
currentlyTerminating.addAll(stillExisting);
|
||||||
|
|
||||||
Collection<Worker> workers = getWorkers(runner);
|
Collection<ImmutableWorkerInfo> workers = getWorkers(runner);
|
||||||
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
|
updateTargetWorkerCount(workerConfig, pendingTasks, workers);
|
||||||
|
|
||||||
if (currentlyTerminating.isEmpty()) {
|
if (currentlyTerminating.isEmpty()) {
|
||||||
|
|
||||||
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
|
final int excessWorkers = (workers.size() + currentlyProvisioning.size()) - targetWorkerCount;
|
||||||
if (excessWorkers > 0) {
|
if (excessWorkers > 0) {
|
||||||
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
|
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
|
||||||
final Collection<String> laziestWorkerIps =
|
final Collection<String> laziestWorkerIps =
|
||||||
Collections2.transform(
|
Collections2.transform(
|
||||||
runner.markWorkersLazy(isLazyWorker, excessWorkers),
|
runner.markWorkersLazy(isLazyWorker, excessWorkers),
|
||||||
@ -334,16 +335,16 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
return scalingStats;
|
return scalingStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Predicate<Worker> createLazyWorkerPredicate(
|
private static Predicate<ImmutableWorkerInfo> createLazyWorkerPredicate(
|
||||||
final SimpleResourceManagementConfig config
|
final SimpleResourceManagementConfig config
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final Predicate<Worker> isValidWorker = createValidWorkerPredicate(config);
|
final Predicate<ImmutableWorkerInfo> isValidWorker = createValidWorkerPredicate(config);
|
||||||
|
|
||||||
return new Predicate<Worker>()
|
return new Predicate<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Worker worker)
|
public boolean apply(ImmutableWorkerInfo worker)
|
||||||
{
|
{
|
||||||
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
|
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
|
||||||
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
|
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
|
||||||
@ -352,14 +353,14 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Predicate<Worker> createValidWorkerPredicate(
|
private static Predicate<ImmutableWorkerInfo> createValidWorkerPredicate(
|
||||||
final SimpleResourceManagementConfig config
|
final SimpleResourceManagementConfig config
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new Predicate<Worker>()
|
return new Predicate<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Worker worker)
|
public boolean apply(ImmutableWorkerInfo worker)
|
||||||
{
|
{
|
||||||
final String minVersion = config.getWorkerVersion();
|
final String minVersion = config.getWorkerVersion();
|
||||||
if (minVersion == null) {
|
if (minVersion == null) {
|
||||||
@ -373,15 +374,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
private void updateTargetWorkerCount(
|
private void updateTargetWorkerCount(
|
||||||
final WorkerBehaviorConfig workerConfig,
|
final WorkerBehaviorConfig workerConfig,
|
||||||
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
|
final Collection<? extends TaskRunnerWorkItem> pendingTasks,
|
||||||
final Collection<Worker> zkWorkers
|
final Collection<ImmutableWorkerInfo> zkWorkers
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
final Collection<Worker> validWorkers = Collections2.filter(
|
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
|
||||||
zkWorkers,
|
zkWorkers,
|
||||||
createValidWorkerPredicate(config)
|
createValidWorkerPredicate(config)
|
||||||
);
|
);
|
||||||
final Predicate<Worker> isLazyWorker = createLazyWorkerPredicate(config);
|
final Predicate<ImmutableWorkerInfo> isLazyWorker = createLazyWorkerPredicate(config);
|
||||||
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
|
final int minWorkerCount = workerConfig.getAutoScaler().getMinNumWorkers();
|
||||||
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
|
final int maxWorkerCount = workerConfig.getAutoScaler().getMaxNumWorkers();
|
||||||
|
|
||||||
@ -464,7 +465,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<Worker> getWorkers(WorkerTaskRunner runner)
|
public Collection<ImmutableWorkerInfo> getWorkers(WorkerTaskRunner runner)
|
||||||
{
|
{
|
||||||
return runner.getWorkers();
|
return runner.getWorkers();
|
||||||
}
|
}
|
||||||
|
@ -416,10 +416,7 @@ public class OverlordResource
|
|||||||
@Override
|
@Override
|
||||||
public Response apply(TaskRunner taskRunner)
|
public Response apply(TaskRunner taskRunner)
|
||||||
{
|
{
|
||||||
if (taskRunner instanceof RemoteTaskRunner) {
|
if (taskRunner instanceof WorkerTaskRunner) {
|
||||||
// Use getZkWorkers instead of getWorkers, as they return richer details (like the list of running tasks)
|
|
||||||
return Response.ok(((RemoteTaskRunner) taskRunner).getZkWorkers()).build();
|
|
||||||
} else if (taskRunner instanceof WorkerTaskRunner) {
|
|
||||||
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
|
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
|
||||||
} else {
|
} else {
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
|||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -35,16 +35,16 @@ import java.util.TreeSet;
|
|||||||
public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy
|
public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrategy
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
public Optional<ImmutableWorkerInfo> findWorkerForTask(
|
||||||
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
|
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(
|
final TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
|
||||||
new Comparator<ImmutableZkWorker>()
|
new Comparator<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public int compare(
|
public int compare(
|
||||||
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
|
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
|
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
|
||||||
@ -64,7 +64,7 @@ public class EqualDistributionWorkerSelectStrategy implements WorkerSelectStrate
|
|||||||
sortedWorkers.addAll(zkWorkers.values());
|
sortedWorkers.addAll(zkWorkers.values());
|
||||||
final String minWorkerVer = config.getMinWorkerVersion();
|
final String minWorkerVer = config.getMinWorkerVersion();
|
||||||
|
|
||||||
for (ImmutableZkWorker zkWorker : sortedWorkers) {
|
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
|
||||||
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
|
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
|
||||||
return Optional.of(zkWorker);
|
return Optional.of(zkWorker);
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import com.google.common.base.Optional;
|
|||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -58,37 +58,37 @@ public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWo
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
public Optional<ImmutableWorkerInfo> findWorkerForTask(
|
||||||
final RemoteTaskRunnerConfig config,
|
final RemoteTaskRunnerConfig config,
|
||||||
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
|
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
|
||||||
final Task task
|
final Task task
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
|
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
|
||||||
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
|
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
|
||||||
for (String workerHost : zkWorkers.keySet()) {
|
for (String workerHost : zkWorkers.keySet()) {
|
||||||
if (!affinityWorkerHosts.contains(workerHost)) {
|
if (!affinityWorkerHosts.contains(workerHost)) {
|
||||||
builder.put(workerHost, zkWorkers.get(workerHost));
|
builder.put(workerHost, zkWorkers.get(workerHost));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
|
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();
|
||||||
|
|
||||||
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
|
List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
|
||||||
if (workerHosts == null) {
|
if (workerHosts == null) {
|
||||||
return super.findWorkerForTask(config, eligibleWorkers, task);
|
return super.findWorkerForTask(config, eligibleWorkers, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>();
|
ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
|
||||||
for (String workerHost : workerHosts) {
|
for (String workerHost : workerHosts) {
|
||||||
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
|
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
|
||||||
if (zkWorker != null) {
|
if (zkWorker != null) {
|
||||||
affinityBuilder.put(workerHost, zkWorker);
|
affinityBuilder.put(workerHost, zkWorker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build();
|
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();
|
||||||
|
|
||||||
if (!affinityWorkers.isEmpty()) {
|
if (!affinityWorkers.isEmpty()) {
|
||||||
Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(config, affinityWorkers, task);
|
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
|
||||||
if (retVal.isPresent()) {
|
if (retVal.isPresent()) {
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
|||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -35,18 +35,18 @@ import java.util.TreeSet;
|
|||||||
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
|
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
public Optional<ImmutableWorkerInfo> findWorkerForTask(
|
||||||
final RemoteTaskRunnerConfig config,
|
final RemoteTaskRunnerConfig config,
|
||||||
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
|
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
|
||||||
final Task task
|
final Task task
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(
|
TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
|
||||||
new Comparator<ImmutableZkWorker>()
|
new Comparator<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public int compare(
|
public int compare(
|
||||||
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
|
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
|
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
|
||||||
@ -66,7 +66,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
|
|||||||
sortedWorkers.addAll(zkWorkers.values());
|
sortedWorkers.addAll(zkWorkers.values());
|
||||||
final String minWorkerVer = config.getMinWorkerVersion();
|
final String minWorkerVer = config.getMinWorkerVersion();
|
||||||
|
|
||||||
for (ImmutableZkWorker zkWorker : sortedWorkers) {
|
for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
|
||||||
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
|
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
|
||||||
return Optional.of(zkWorker);
|
return Optional.of(zkWorker);
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ import com.google.common.base.Preconditions;
|
|||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
|
|
||||||
import javax.script.Compilable;
|
import javax.script.Compilable;
|
||||||
@ -39,7 +39,7 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
|
|||||||
{
|
{
|
||||||
public static interface SelectorFunction
|
public static interface SelectorFunction
|
||||||
{
|
{
|
||||||
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task);
|
public String apply(RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final SelectorFunction fnSelector;
|
private final SelectorFunction fnSelector;
|
||||||
@ -61,8 +61,8 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
public Optional<ImmutableWorkerInfo> findWorkerForTask(
|
||||||
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
|
RemoteTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
String worker = fnSelector.apply(config, zkWorkers, task);
|
String worker = fnSelector.apply(config, zkWorkers, task);
|
||||||
|
@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -46,11 +46,11 @@ public interface WorkerSelectStrategy
|
|||||||
* @param zkWorkers An immutable map of workers to choose from.
|
* @param zkWorkers An immutable map of workers to choose from.
|
||||||
* @param task The task to assign.
|
* @param task The task to assign.
|
||||||
*
|
*
|
||||||
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
|
* @return A {@link io.druid.indexing.overlord.ImmutableWorkerInfo} to run the task if one is available.
|
||||||
*/
|
*/
|
||||||
Optional<ImmutableZkWorker> findWorkerForTask(
|
Optional<ImmutableWorkerInfo> findWorkerForTask(
|
||||||
final RemoteTaskRunnerConfig config,
|
final RemoteTaskRunnerConfig config,
|
||||||
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
|
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
|
||||||
final Task task
|
final Task task
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -21,9 +21,6 @@ package io.druid.indexing.worker;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.joda.time.DateTime;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A container for worker metadata.
|
* A container for worker metadata.
|
||||||
@ -34,24 +31,19 @@ public class Worker
|
|||||||
private final String ip;
|
private final String ip;
|
||||||
private final int capacity;
|
private final int capacity;
|
||||||
private final String version;
|
private final String version;
|
||||||
private final AtomicReference<DateTime> lastCompletedTaskTime;
|
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public Worker(
|
public Worker(
|
||||||
@JsonProperty("host") String host,
|
@JsonProperty("host") String host,
|
||||||
@JsonProperty("ip") String ip,
|
@JsonProperty("ip") String ip,
|
||||||
@JsonProperty("capacity") int capacity,
|
@JsonProperty("capacity") int capacity,
|
||||||
@JsonProperty("version") String version,
|
@JsonProperty("version") String version
|
||||||
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
|
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.host = host;
|
this.host = host;
|
||||||
this.ip = ip;
|
this.ip = ip;
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
this.lastCompletedTaskTime = new AtomicReference<>(lastCompletedTaskTime == null
|
|
||||||
? DateTime.now()
|
|
||||||
: lastCompletedTaskTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@ -78,22 +70,6 @@ public class Worker
|
|||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
public DateTime getLastCompletedTaskTime()
|
|
||||||
{
|
|
||||||
return lastCompletedTaskTime.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
|
||||||
{
|
|
||||||
lastCompletedTaskTime.set(completedTaskTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isValidVersion(final String minVersion)
|
|
||||||
{
|
|
||||||
return getVersion().compareTo(minVersion) >= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
@ -104,4 +80,39 @@ public class Worker
|
|||||||
", version='" + version + '\'' +
|
", version='" + version + '\'' +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Worker worker = (Worker) o;
|
||||||
|
|
||||||
|
if (capacity != worker.capacity) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!host.equals(worker.host)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!ip.equals(worker.ip)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return version.equals(worker.version);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = host.hashCode();
|
||||||
|
result = 31 * result + ip.hashCode();
|
||||||
|
result = 31 * result + capacity;
|
||||||
|
result = 31 * result + version.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,8 +80,7 @@ public class WorkerResource
|
|||||||
enabledWorker.getHost(),
|
enabledWorker.getHost(),
|
||||||
enabledWorker.getIp(),
|
enabledWorker.getIp(),
|
||||||
enabledWorker.getCapacity(),
|
enabledWorker.getCapacity(),
|
||||||
DISABLED_VERSION,
|
DISABLED_VERSION
|
||||||
enabledWorker.getLastCompletedTaskTime()
|
|
||||||
);
|
);
|
||||||
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
|
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
|
||||||
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
|
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
|
||||||
|
@ -0,0 +1,182 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
package io.druid.indexing.overlord;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import io.druid.indexing.worker.Worker;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ImmutableWorkerInfoTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws Exception
|
||||||
|
{
|
||||||
|
ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
);
|
||||||
|
ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
final ImmutableWorkerInfo serde = mapper.readValue(
|
||||||
|
mapper.writeValueAsString(workerInfo),
|
||||||
|
ImmutableWorkerInfo.class
|
||||||
|
);
|
||||||
|
Assert.assertEquals(workerInfo, serde);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEqualsAndSerde()
|
||||||
|
{
|
||||||
|
// Everything equal
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), true);
|
||||||
|
|
||||||
|
// different worker same tasks
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker1", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker2", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), false);
|
||||||
|
|
||||||
|
// same worker different task groups
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp3", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), false);
|
||||||
|
|
||||||
|
// same worker different tasks
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker1", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker2", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task3"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), false);
|
||||||
|
|
||||||
|
// same worker different capacity
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker1", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
3,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker2", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), false);
|
||||||
|
|
||||||
|
// same worker different lastCompletedTaskTime
|
||||||
|
assertEqualsAndHashCode(new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker1", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
3,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:01Z")
|
||||||
|
), new ImmutableWorkerInfo(
|
||||||
|
new Worker(
|
||||||
|
"testWorker2", "192.0.0.1", 10, "v1"
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
ImmutableSet.of("grp1", "grp2"),
|
||||||
|
ImmutableSet.of("task1", "task2"),
|
||||||
|
new DateTime("2015-01-01T01:01:02Z")
|
||||||
|
), false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)
|
||||||
|
{
|
||||||
|
if (shouldMatch) {
|
||||||
|
Assert.assertTrue(o1.equals(o2));
|
||||||
|
Assert.assertEquals(o1.hashCode(), o2.hashCode());
|
||||||
|
} else {
|
||||||
|
Assert.assertFalse(o1.equals(o2));
|
||||||
|
Assert.assertNotEquals(o1.hashCode(), o2.hashCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -55,7 +55,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
|
|||||||
import org.apache.curator.test.TestingCluster;
|
import org.apache.curator.test.TestingCluster;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.joda.time.DateTime;
|
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -359,8 +358,8 @@ public class RemoteTaskRunnerTest
|
|||||||
doSetup();
|
doSetup();
|
||||||
|
|
||||||
final Set<String> existingTasks = Sets.newHashSet();
|
final Set<String> existingTasks = Sets.newHashSet();
|
||||||
for (ZkWorker zkWorker : remoteTaskRunner.getZkWorkers()) {
|
for (ImmutableWorkerInfo workerInfo : remoteTaskRunner.getWorkers()) {
|
||||||
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
|
existingTasks.addAll(workerInfo.getRunningTasks());
|
||||||
}
|
}
|
||||||
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
|
Assert.assertEquals("existingTasks", ImmutableSet.of("first", "second"), existingTasks);
|
||||||
|
|
||||||
@ -451,7 +450,7 @@ public class RemoteTaskRunnerTest
|
|||||||
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
|
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
|
||||||
|
|
||||||
// Confirm RTR thinks the worker is disabled.
|
// Confirm RTR thinks the worker is disabled.
|
||||||
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getVersion());
|
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doSetup() throws Exception
|
private void doSetup() throws Exception
|
||||||
@ -492,8 +491,7 @@ public class RemoteTaskRunnerTest
|
|||||||
"worker",
|
"worker",
|
||||||
"localhost",
|
"localhost",
|
||||||
3,
|
3,
|
||||||
"0",
|
"0"
|
||||||
DateTime.now()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
|
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
|
||||||
@ -506,7 +504,7 @@ public class RemoteTaskRunnerTest
|
|||||||
{
|
{
|
||||||
cf.setData().forPath(
|
cf.setData().forPath(
|
||||||
announcementsPath,
|
announcementsPath,
|
||||||
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "", DateTime.now()))
|
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -578,10 +576,10 @@ public class RemoteTaskRunnerTest
|
|||||||
Assert.assertTrue(taskAnnounced(task.getId()));
|
Assert.assertTrue(taskAnnounced(task.getId()));
|
||||||
mockWorkerRunningTask(task);
|
mockWorkerRunningTask(task);
|
||||||
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
||||||
new Predicate<Worker>()
|
new Predicate<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Worker input)
|
public boolean apply(ImmutableWorkerInfo input)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -599,10 +597,10 @@ public class RemoteTaskRunnerTest
|
|||||||
remoteTaskRunner.run(task);
|
remoteTaskRunner.run(task);
|
||||||
Assert.assertTrue(taskAnnounced(task.getId()));
|
Assert.assertTrue(taskAnnounced(task.getId()));
|
||||||
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
||||||
new Predicate<Worker>()
|
new Predicate<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Worker input)
|
public boolean apply(ImmutableWorkerInfo input)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -618,10 +616,10 @@ public class RemoteTaskRunnerTest
|
|||||||
{
|
{
|
||||||
doSetup();
|
doSetup();
|
||||||
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
|
||||||
new Predicate<Worker>()
|
new Predicate<ImmutableWorkerInfo>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(Worker input)
|
public boolean apply(ImmutableWorkerInfo input)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ import io.druid.indexing.common.TaskStatus;
|
|||||||
import io.druid.indexing.common.TestTasks;
|
import io.druid.indexing.common.TestTasks;
|
||||||
import io.druid.indexing.common.task.NoopTask;
|
import io.druid.indexing.common.task.NoopTask;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.RemoteTaskRunner;
|
import io.druid.indexing.overlord.RemoteTaskRunner;
|
||||||
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
|
||||||
import io.druid.indexing.overlord.ZkWorker;
|
import io.druid.indexing.overlord.ZkWorker;
|
||||||
@ -119,7 +120,7 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
);
|
);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).toImmutable()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -155,7 +156,7 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).toImmutable()
|
||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -212,7 +213,7 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).toImmutable()
|
||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -263,10 +264,11 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).toImmutable()
|
||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
|
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
|
||||||
|
.andReturn(
|
||||||
Collections.<Worker>singletonList(
|
Collections.<Worker>singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).getWorker()
|
||||||
)
|
)
|
||||||
@ -305,11 +307,12 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).toImmutable()
|
||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
|
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
|
||||||
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
|
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
|
||||||
|
.andReturn(
|
||||||
Collections.singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(testTask).getWorker()
|
new TestZkWorker(testTask).getWorker()
|
||||||
)
|
)
|
||||||
@ -354,12 +357,13 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
new TestZkWorker(NoopTask.create()).getWorker(),
|
new TestZkWorker(NoopTask.create()).toImmutable(),
|
||||||
new TestZkWorker(NoopTask.create()).getWorker()
|
new TestZkWorker(NoopTask.create()).toImmutable()
|
||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
|
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
|
||||||
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
|
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
|
||||||
|
.andReturn(
|
||||||
Collections.<Worker>emptyList()
|
Collections.<Worker>emptyList()
|
||||||
);
|
);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -398,12 +402,13 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
Collections.<RemoteTaskRunnerWorkItem>emptyList()
|
Collections.<RemoteTaskRunnerWorkItem>emptyList()
|
||||||
).times(3);
|
).times(3);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.<Worker>singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(NoopTask.create(), "h1", "i1", "0").getWorker()
|
new TestZkWorker(NoopTask.create(), "h1", "i1", "0").toImmutable()
|
||||||
)
|
)
|
||||||
).times(3);
|
).times(3);
|
||||||
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
|
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
|
||||||
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<Worker>>anyObject(), EasyMock.anyInt())).andReturn(
|
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
|
||||||
|
.andReturn(
|
||||||
Collections.<Worker>emptyList()
|
Collections.<Worker>emptyList()
|
||||||
);
|
);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -462,8 +467,8 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
)
|
)
|
||||||
).times(2);
|
).times(2);
|
||||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||||
Collections.<Worker>singletonList(
|
Collections.singletonList(
|
||||||
new TestZkWorker(null).getWorker()
|
new TestZkWorker(null).toImmutable()
|
||||||
)
|
)
|
||||||
).times(1);
|
).times(1);
|
||||||
EasyMock.replay(runner);
|
EasyMock.replay(runner);
|
||||||
@ -501,7 +506,7 @@ public class SimpleResourceManagementStrategyTest
|
|||||||
String version
|
String version
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(new Worker(host, ip, 3, version, DateTime.now()), null, new DefaultObjectMapper());
|
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
|
||||||
|
|
||||||
this.testTask = testTask;
|
this.testTask = testTask;
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ import com.google.common.base.Optional;
|
|||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import io.druid.indexing.common.task.NoopTask;
|
import io.druid.indexing.common.task.NoopTask;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
@ -38,18 +38,22 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
{
|
{
|
||||||
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"lhost",
|
"lhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
|
new Worker("lhost", "lhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
),
|
),
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 1,
|
new Worker("localhost", "localhost", 1, "v1"), 1,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
@ -61,7 +65,7 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
ImmutableZkWorker worker = optional.get();
|
ImmutableWorkerInfo worker = optional.get();
|
||||||
Assert.assertEquals("lhost", worker.getWorker().getHost());
|
Assert.assertEquals("lhost", worker.getWorker().getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,18 +75,22 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
String DISABLED_VERSION = "";
|
String DISABLED_VERSION = "";
|
||||||
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"lhost",
|
"lhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 2,
|
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
),
|
),
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
|
new Worker("enableHost", "enableHost", 10, "v1"), 5,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
@ -94,7 +102,7 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
ImmutableZkWorker worker = optional.get();
|
ImmutableWorkerInfo worker = optional.get();
|
||||||
Assert.assertEquals("enableHost", worker.getWorker().getHost());
|
Assert.assertEquals("enableHost", worker.getWorker().getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,18 +112,22 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
String DISABLED_VERSION = "";
|
String DISABLED_VERSION = "";
|
||||||
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"lhost",
|
"lhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION, DateTime.now()), 5,
|
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
),
|
),
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("enableHost", "enableHost", 10, "v1", DateTime.now()), 5,
|
new Worker("enableHost", "enableHost", 10, "v1"), 5,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
@ -127,7 +139,7 @@ public class EqualDistributionWorkerSelectStrategyTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
ImmutableZkWorker worker = optional.get();
|
ImmutableWorkerInfo worker = optional.get();
|
||||||
Assert.assertEquals("enableHost", worker.getWorker().getHost());
|
Assert.assertEquals("enableHost", worker.getWorker().getHost());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ import com.google.common.base.Optional;
|
|||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import io.druid.indexing.common.task.NoopTask;
|
import io.druid.indexing.common.task.NoopTask;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||||
import io.druid.indexing.worker.Worker;
|
import io.druid.indexing.worker.Worker;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
@ -41,18 +41,22 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
|
|||||||
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
||||||
);
|
);
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"lhost",
|
"lhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
|
new Worker("lhost", "lhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
),
|
),
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
|
new Worker("localhost", "localhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
@ -64,7 +68,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
ImmutableZkWorker worker = optional.get();
|
ImmutableWorkerInfo worker = optional.get();
|
||||||
Assert.assertEquals("localhost", worker.getWorker().getHost());
|
Assert.assertEquals("localhost", worker.getWorker().getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,23 +79,27 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
|
|||||||
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
||||||
);
|
);
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"lhost",
|
"lhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("lhost", "lhost", 1, "v1", DateTime.now()), 0,
|
new Worker("lhost", "lhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
),
|
),
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
|
new Worker("localhost", "localhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
);
|
);
|
||||||
ImmutableZkWorker worker = optional.get();
|
ImmutableWorkerInfo worker = optional.get();
|
||||||
Assert.assertEquals("lhost", worker.getWorker().getHost());
|
Assert.assertEquals("lhost", worker.getWorker().getHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,13 +110,15 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
|
|||||||
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
|
||||||
);
|
);
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
|
||||||
new RemoteTaskRunnerConfig(),
|
new RemoteTaskRunnerConfig(),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
"localhost",
|
"localhost",
|
||||||
new ImmutableZkWorker(
|
new ImmutableWorkerInfo(
|
||||||
new Worker("localhost", "localhost", 1, "v1", DateTime.now()), 0,
|
new Worker("localhost", "localhost", 1, "v1"), 0,
|
||||||
Sets.<String>newHashSet()
|
Sets.<String>newHashSet(),
|
||||||
|
Sets.<String>newHashSet(),
|
||||||
|
DateTime.now()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new NoopTask(null, 1, 0, null, null, null)
|
new NoopTask(null, 1, 0, null, null, null)
|
||||||
|
@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
import io.druid.indexing.overlord.ImmutableWorkerInfo;
|
||||||
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
|
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
@ -79,14 +79,14 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
@Test
|
@Test
|
||||||
public void testFindWorkerForTask()
|
public void testFindWorkerForTask()
|
||||||
{
|
{
|
||||||
ImmutableZkWorker worker1 = createMockWorker(1, true, true);
|
ImmutableWorkerInfo worker1 = createMockWorker(1, true, true);
|
||||||
ImmutableZkWorker worker2 = createMockWorker(1, true, true);
|
ImmutableWorkerInfo worker2 = createMockWorker(1, true, true);
|
||||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
|
||||||
"10.0.0.1", worker1,
|
"10.0.0.1", worker1,
|
||||||
"10.0.0.3", worker2
|
"10.0.0.3", worker2
|
||||||
);
|
);
|
||||||
|
|
||||||
ImmutableZkWorker workerForBatchTask = strategy.findWorkerForTask(
|
ImmutableWorkerInfo workerForBatchTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("index_hadoop")
|
createMockTask("index_hadoop")
|
||||||
@ -94,7 +94,7 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
// batch tasks should be sent to worker1
|
// batch tasks should be sent to worker1
|
||||||
Assert.assertEquals(worker1, workerForBatchTask);
|
Assert.assertEquals(worker1, workerForBatchTask);
|
||||||
|
|
||||||
ImmutableZkWorker workerForOtherTask = strategy.findWorkerForTask(
|
ImmutableWorkerInfo workerForOtherTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("other_type")
|
createMockTask("other_type")
|
||||||
@ -106,11 +106,11 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
@Test
|
@Test
|
||||||
public void testIsolationOfBatchWorker()
|
public void testIsolationOfBatchWorker()
|
||||||
{
|
{
|
||||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
|
||||||
"10.0.0.1", createMockWorker(1, true, true),
|
"10.0.0.1", createMockWorker(1, true, true),
|
||||||
"10.0.0.2", createMockWorker(1, true, true)
|
"10.0.0.2", createMockWorker(1, true, true)
|
||||||
);
|
);
|
||||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("other_type")
|
createMockTask("other_type")
|
||||||
@ -121,18 +121,18 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
@Test
|
@Test
|
||||||
public void testNoValidWorker()
|
public void testNoValidWorker()
|
||||||
{
|
{
|
||||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
|
||||||
"10.0.0.1", createMockWorker(1, true, false),
|
"10.0.0.1", createMockWorker(1, true, false),
|
||||||
"10.0.0.4", createMockWorker(1, true, false)
|
"10.0.0.4", createMockWorker(1, true, false)
|
||||||
);
|
);
|
||||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("index_hadoop")
|
createMockTask("index_hadoop")
|
||||||
);
|
);
|
||||||
Assert.assertFalse(workerForBatchTask.isPresent());
|
Assert.assertFalse(workerForBatchTask.isPresent());
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("otherTask")
|
createMockTask("otherTask")
|
||||||
@ -144,18 +144,18 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
@Test
|
@Test
|
||||||
public void testNoWorkerCanRunTask()
|
public void testNoWorkerCanRunTask()
|
||||||
{
|
{
|
||||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
|
||||||
"10.0.0.1", createMockWorker(1, false, true),
|
"10.0.0.1", createMockWorker(1, false, true),
|
||||||
"10.0.0.4", createMockWorker(1, false, true)
|
"10.0.0.4", createMockWorker(1, false, true)
|
||||||
);
|
);
|
||||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("index_hadoop")
|
createMockTask("index_hadoop")
|
||||||
);
|
);
|
||||||
Assert.assertFalse(workerForBatchTask.isPresent());
|
Assert.assertFalse(workerForBatchTask.isPresent());
|
||||||
|
|
||||||
Optional<ImmutableZkWorker> workerForOtherTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForOtherTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("otherTask")
|
createMockTask("otherTask")
|
||||||
@ -168,11 +168,11 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
public void testFillWorkerCapacity()
|
public void testFillWorkerCapacity()
|
||||||
{
|
{
|
||||||
// tasks shoudl be assigned to the worker with maximum currCapacity used until its full
|
// tasks shoudl be assigned to the worker with maximum currCapacity used until its full
|
||||||
ImmutableMap<String, ImmutableZkWorker> workerMap = ImmutableMap.of(
|
ImmutableMap<String, ImmutableWorkerInfo> workerMap = ImmutableMap.of(
|
||||||
"10.0.0.1", createMockWorker(1, true, true),
|
"10.0.0.1", createMockWorker(1, true, true),
|
||||||
"10.0.0.2", createMockWorker(5, true, true)
|
"10.0.0.2", createMockWorker(5, true, true)
|
||||||
);
|
);
|
||||||
Optional<ImmutableZkWorker> workerForBatchTask = strategy.findWorkerForTask(
|
Optional<ImmutableWorkerInfo> workerForBatchTask = strategy.findWorkerForTask(
|
||||||
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
|
||||||
workerMap,
|
workerMap,
|
||||||
createMockTask("index_hadoop")
|
createMockTask("index_hadoop")
|
||||||
@ -189,9 +189,9 @@ public class JavaScriptWorkerSelectStrategyTest
|
|||||||
return mock;
|
return mock;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ImmutableZkWorker createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
|
private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
|
||||||
{
|
{
|
||||||
ImmutableZkWorker worker = EasyMock.createMock(ImmutableZkWorker.class);
|
ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class);
|
||||||
EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
|
EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
|
||||||
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
|
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
|
||||||
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
|
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
|
||||||
|
@ -114,8 +114,7 @@ public class WorkerTaskMonitorTest
|
|||||||
"worker",
|
"worker",
|
||||||
"localhost",
|
"localhost",
|
||||||
3,
|
3,
|
||||||
"0",
|
"0"
|
||||||
DateTime.now()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
workerCuratorCoordinator = new WorkerCuratorCoordinator(
|
workerCuratorCoordinator = new WorkerCuratorCoordinator(
|
||||||
|
@ -74,8 +74,7 @@ public class WorkerResourceTest
|
|||||||
"host",
|
"host",
|
||||||
"ip",
|
"ip",
|
||||||
3,
|
3,
|
||||||
"v1",
|
"v1"
|
||||||
DateTime.now()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
curatorCoordinator = new WorkerCuratorCoordinator(
|
curatorCoordinator = new WorkerCuratorCoordinator(
|
||||||
|
@ -107,8 +107,7 @@ public class CliMiddleManager extends ServerRunnable
|
|||||||
node.getHostAndPort(),
|
node.getHostAndPort(),
|
||||||
config.getIp(),
|
config.getIp(),
|
||||||
config.getCapacity(),
|
config.getCapacity(),
|
||||||
config.getVersion(),
|
config.getVersion()
|
||||||
DateTime.now()
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
Loading…
x
Reference in New Issue
Block a user