Fix bugs in RTR related to blacklisting, change default worker strategy (#4619)

* fix bugs in RTR related to blacklisting, change default worker strategy to equalDistribution

* code review and additional changes

* fix errorprone

* code review changes
This commit is contained in:
David Lim 2017-08-03 11:34:45 -06:00 committed by Jonathan Wei
parent f3f2cd35e1
commit dd0b84e766
10 changed files with 378 additions and 123 deletions

View File

@ -193,7 +193,7 @@ Issuing a GET request at the same URL will return the current worker config spec
|Property|Description|Default|
|--------|-----------|-------|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|fillCapacity|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|equalDistribution|
|`autoScaler`|Only used if autoscaling is enabled. See below.|null|
To view the audit history of worker config issue a GET request to the URL -

View File

@ -26,6 +26,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;
@ -39,6 +40,7 @@ public class ImmutableWorkerInfo
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
private final DateTime blacklistedUntil;
@JsonCreator
public ImmutableWorkerInfo(
@ -46,7 +48,8 @@ public class ImmutableWorkerInfo
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@Nullable @JsonProperty("blacklistedUntil") DateTime blacklistedUntil
)
{
this.worker = worker;
@ -54,6 +57,18 @@ public class ImmutableWorkerInfo
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
this.blacklistedUntil = blacklistedUntil;
}
public ImmutableWorkerInfo(
Worker worker,
int currCapacityUsed,
Set<String> availabilityGroups,
Collection<String> runningTasks,
DateTime lastCompletedTaskTime
)
{
this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}
@JsonProperty("worker")
@ -91,6 +106,12 @@ public class ImmutableWorkerInfo
return lastCompletedTaskTime;
}
@JsonProperty
public DateTime getBlacklistedUntil()
{
return blacklistedUntil;
}
public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
@ -126,8 +147,12 @@ public class ImmutableWorkerInfo
if (!runningTasks.equals(that.runningTasks)) {
return false;
}
return lastCompletedTaskTime.equals(that.lastCompletedTaskTime);
if (!lastCompletedTaskTime.equals(that.lastCompletedTaskTime)) {
return false;
}
return !(blacklistedUntil != null
? !blacklistedUntil.equals(that.blacklistedUntil)
: that.blacklistedUntil != null);
}
@Override
@ -138,6 +163,7 @@ public class ImmutableWorkerInfo
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
result = 31 * result + (blacklistedUntil != null ? blacklistedUntil.hashCode() : 0);
return result;
}
@ -150,6 +176,7 @@ public class ImmutableWorkerInfo
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
", blacklistedUntil=" + blacklistedUntil +
'}';
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -65,12 +64,12 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.initialization.IndexerZkConfig;
@ -161,7 +160,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<>();
// Workers that have been blacklisted.
private final Set<ZkWorker> blackListedWorkers = Collections.synchronizedSet(new HashSet<ZkWorker>());
private final Set<ZkWorker> blackListedWorkers = Collections.synchronizedSet(new HashSet<>());
// task runner listeners
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
@ -328,7 +327,14 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
waitingForMonitor.wait();
}
}
scheduleBlackListedNodesCleanUp();
ScheduledExecutors.scheduleAtFixedRate(
cleanupExec,
Period.ZERO.toStandardDuration(),
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
() -> checkBlackListedNodes()
);
provisioningService = provisioningStrategy.makeProvisioningService(this);
lifecycleLock.started();
}
@ -678,15 +684,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@VisibleForTesting
static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks)
{
Collections.sort(tasks, new Comparator<RemoteTaskRunnerWorkItem>()
{
@Override
public int compare(RemoteTaskRunnerWorkItem o1, RemoteTaskRunnerWorkItem o2)
{
return o1.getQueueInsertionTime().compareTo(o2.getQueueInsertionTime());
}
}
);
Collections.sort(tasks, Comparator.comparing(RemoteTaskRunnerWorkItem::getQueueInsertionTime));
}
/**
@ -744,8 +742,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
WorkerSelectStrategy strategy;
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
log.warn("No worker selections strategy set. Using default.");
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
log.info("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());
} else {
strategy = workerConfig.getSelectStrategy();
}
@ -787,7 +785,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (immutableZkWorker.isPresent() &&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
}
}
@ -834,7 +832,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
final String worker = theZkWorker.getWorker().getHost();
synchronized (statusLock) {
if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
// the worker might got killed or has been marked as lazy.
// the worker might have been killed or marked as lazy
log.info("Not assigning task to already removed worker[%s]", worker);
return false;
}
@ -1080,45 +1078,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
log.error(e, "Exception closing worker[%s]!", worker.getHost());
}
zkWorkers.remove(worker.getHost());
checkBlackListedNodes();
}
}
lazyWorkers.remove(worker.getHost());
}
/**
* Schedule a task that will clean the blackListed ZK Workers periodically
*/
private void scheduleBlackListedNodesCleanUp()
{
ScheduledExecutors.scheduleAtFixedRate(
cleanupExec,
Period.ZERO.toStandardDuration(),
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
long currentTimeStamp = System.currentTimeMillis();
for (ZkWorker zkWorker : blackListedWorkers) {
cleanBlackListedNode(zkWorker, currentTimeStamp);
}
}
}
);
}
public void cleanBlackListedNode(ZkWorker zkWorker, long currentTimeStamp)
{
// Clean blacklisted workers if blacklisted time has elapsed
if(currentTimeStamp - zkWorker.getLastCompletedTaskTime().getMillis() >
config.getWorkerBlackListBackoffTime().toStandardDuration().getMillis()){
// White listing node
log.info("Whitelisting worker [%s]. ", zkWorker);
blackListedWorkers.remove(zkWorker);
}
}
/**
* Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail"
* if they are being run by "worker".
@ -1222,17 +1187,32 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
runningTasks.remove(taskStatus.getId());
// Update success/failure counters
if(taskStatus.isSuccess()){
zkWorker.resetCountinouslyFailedTasksCount();
} else if(taskStatus.isFailure()){
zkWorker.incrementCountinouslyFailedTasksCount();
}
if (zkWorker != null) {
if (taskStatus.isSuccess()) {
zkWorker.resetContinuouslyFailedTasksCount();
if (blackListedWorkers.remove(zkWorker)) {
zkWorker.setBlacklistedUntil(null);
log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
}
} else if (taskStatus.isFailure()) {
zkWorker.incrementContinuouslyFailedTasksCount();
}
// BlackList node if there are too many failures.
if(zkWorker.getCountinouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
blackListedWorkers.size() <=
zkWorkers.size()*(config.getMaxPercentageBlacklistWorkers()/100)){
blackListedWorkers.add(zkWorker);
// Blacklist node if there are too many failures.
synchronized (blackListedWorkers) {
if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
zkWorker.setBlacklistedUntil(DateTime.now().plus(config.getWorkerBlackListBackoffTime()));
if (blackListedWorkers.add(zkWorker)) {
log.info(
"Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
zkWorker.getWorker(),
zkWorker.getBlacklistedUntil(),
zkWorker.getContinuouslyFailedTasksCount()
);
}
}
}
}
// Notify interested parties
@ -1297,41 +1277,71 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
{
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo apply(ZkWorker input)
{
return input.toImmutable();
}
}
)
);
return ImmutableList.copyOf(Collections2.transform(workers, ZkWorker::toImmutable));
}
private static ImmutableList<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
)
);
return ImmutableList.copyOf(Collections2.transform(workers, ZkWorker::getWorker));
}
public Collection<ImmutableWorkerInfo> getBlackListedWorkers()
{
return getImmutableWorkerFromZK(blackListedWorkers);
synchronized (blackListedWorkers) {
return getImmutableWorkerFromZK(blackListedWorkers);
}
}
private boolean shouldRemoveNodeFromBlackList(ZkWorker zkWorker)
{
if (blackListedWorkers.size() > zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0)) {
log.info(
"Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]",
zkWorker.getWorker(),
config.getMaxPercentageBlacklistWorkers()
);
return true;
}
long remainingMillis = zkWorker.getBlacklistedUntil().getMillis() - getCurrentTimeMillis();
if (remainingMillis <= 0) {
log.info("Removing [%s] from blacklist because backoff time elapsed", zkWorker.getWorker());
return true;
}
log.info("[%s] still blacklisted for [%,ds]", zkWorker.getWorker(), remainingMillis / 1000);
return false;
}
@VisibleForTesting
void checkBlackListedNodes()
{
boolean shouldRunPendingTasks = false;
// must be synchronized while iterating:
// https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#synchronizedSet-java.util.Set-
synchronized (blackListedWorkers) {
for (Iterator<ZkWorker> iterator = blackListedWorkers.iterator(); iterator.hasNext(); ) {
ZkWorker zkWorker = iterator.next();
if (shouldRemoveNodeFromBlackList(zkWorker)) {
iterator.remove();
zkWorker.resetContinuouslyFailedTasksCount();
zkWorker.setBlacklistedUntil(null);
shouldRunPendingTasks = true;
}
}
}
if (shouldRunPendingTasks) {
runPendingTasks();
}
}
@VisibleForTesting
protected long getCurrentTimeMillis()
{
return System.currentTimeMillis();
}
@VisibleForTesting

View File

@ -51,8 +51,10 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
private AtomicInteger countinouslyFailedTasksCount = new AtomicInteger(0);
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<>(new DateTime());
private AtomicReference<DateTime> blacklistedUntil = new AtomicReference<>();
private AtomicInteger continuouslyFailedTasksCount = new AtomicInteger(0);
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
@ -134,6 +136,12 @@ public class ZkWorker implements Closeable
return lastCompletedTaskTime.get();
}
@JsonProperty
public DateTime getBlacklistedUntil()
{
return blacklistedUntil.get();
}
public boolean isRunningTask(String taskId)
{
return getRunningTasks().containsKey(taskId);
@ -158,10 +166,22 @@ public class ZkWorker implements Closeable
lastCompletedTaskTime.set(completedTaskTime);
}
public void setBlacklistedUntil(DateTime blacklistedUntil)
{
this.blacklistedUntil.set(blacklistedUntil);
}
public ImmutableWorkerInfo toImmutable()
{
return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get());
return new ImmutableWorkerInfo(
worker.get(),
getCurrCapacityUsed(),
getAvailabilityGroups(),
getRunningTaskIds(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);
}
@Override
@ -170,19 +190,19 @@ public class ZkWorker implements Closeable
statusCache.close();
}
public int getCountinouslyFailedTasksCount()
public int getContinuouslyFailedTasksCount()
{
return countinouslyFailedTasksCount.get();
return continuouslyFailedTasksCount.get();
}
public void resetCountinouslyFailedTasksCount()
public void resetContinuouslyFailedTasksCount()
{
this.countinouslyFailedTasksCount.set(0);
this.continuouslyFailedTasksCount.set(0);
}
public void incrementCountinouslyFailedTasksCount()
public void incrementContinuouslyFailedTasksCount()
{
this.countinouslyFailedTasksCount.incrementAndGet();
this.continuouslyFailedTasksCount.incrementAndGet();
}
@Override
@ -191,6 +211,7 @@ public class ZkWorker implements Closeable
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
", blacklistedUntil=" + blacklistedUntil +
'}';
}
}

View File

@ -65,7 +65,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
@JsonProperty
@Max(100)
@Min(0)
private double maxPercentageBlacklistWorkers = 20;
private int maxPercentageBlacklistWorkers = 20;
public Period getTaskAssignmentTimeout()
{
@ -123,7 +123,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
this.workerBlackListCleanupPeriod = workerBlackListCleanupPeriod;
}
public double getMaxPercentageBlacklistWorkers()
public int getMaxPercentageBlacklistWorkers()
{
return maxPercentageBlacklistWorkers;
}
@ -188,7 +188,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
result = 31 * result + maxRetriesBeforeBlacklist;
result = 31 * result + workerBlackListBackoffTime.hashCode();
result = 31 * result + workerBlackListCleanupPeriod.hashCode();
result = 31 * result + (int)maxPercentageBlacklistWorkers;
result = 31 * result + maxPercentageBlacklistWorkers;
return result;
}

View File

@ -29,7 +29,7 @@ import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy();
public static WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy();
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
public static WorkerBehaviorConfig defaultConfig()

View File

@ -167,6 +167,26 @@ public class ImmutableWorkerInfoTest
new DateTime("2015-01-01T01:01:02Z")
), false);
// same worker different blacklistedUntil
assertEqualsAndHashCode(new ImmutableWorkerInfo(
new Worker(
"http", "testWorker1", "192.0.0.1", 10, "v1"
),
3,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:01Z"),
new DateTime("2017-07-30")
), new ImmutableWorkerInfo(
new Worker(
"http", "testWorker2", "192.0.0.1", 10, "v1"
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
new DateTime("2015-01-01T01:01:02Z"),
new DateTime("2017-07-31")
), false);
}
private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)

View File

@ -59,8 +59,8 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
@Test(timeout = 60_000)
public void testConcurrency() throws Exception
{
rtrTestUtils.makeWorker("worker0");
rtrTestUtils.makeWorker("worker1");
rtrTestUtils.makeWorker("worker0", 3);
rtrTestUtils.makeWorker("worker1", 3);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(new Period("PT3600S"))

View File

@ -439,7 +439,7 @@ public class RemoteTaskRunnerTest
private void makeWorker() throws Exception
{
worker = rtrTestUtils.makeWorker(workerHost);
worker = rtrTestUtils.makeWorker(workerHost, 3);
}
private void disableWorker() throws Exception
@ -609,7 +609,7 @@ public class RemoteTaskRunnerTest
.withQueueInsertionTime(new DateTime("2015-01-01T00:00:01Z"));
ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3);
RemoteTaskRunner.sortByInsertionTime(workItems);
Assert.assertEquals(item3,workItems.get(0));
Assert.assertEquals(item3, workItems.get(0));
Assert.assertEquals(item2, workItems.get(1));
Assert.assertEquals(item1, workItems.get(2));
}
@ -619,7 +619,11 @@ public class RemoteTaskRunnerTest
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(timeoutPeriod));
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1",
@ -635,7 +639,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(1,
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getCountinouslyFailedTasksCount());
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount());
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2",
@ -650,16 +654,25 @@ public class RemoteTaskRunnerTest
mockWorkerCompleteFailedTask(task2);
Assert.assertTrue(taskFuture2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount());
Assert.assertEquals(
2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
);
remoteTaskRunner.cleanBlackListedNode(remoteTaskRunner.findWorkerRunningTask(task2.getId()),
System.currentTimeMillis() + 2*timeoutPeriod.toStandardDuration().getMillis());
((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
.setCurrentTimeMillis(System.currentTimeMillis());
remoteTaskRunner.checkBlackListedNodes();
// After backOffTime the nodes are whitelisted
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
.setCurrentTimeMillis(System.currentTimeMillis() + 2 * timeoutPeriod.toStandardDuration().getMillis());
remoteTaskRunner.checkBlackListedNodes();
// After backOffTime the nodes are removed from blacklist
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount());
Assert.assertEquals(0,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount());
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3",
@ -675,6 +688,126 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0,
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getCountinouslyFailedTasksCount());
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount());
}
/**
* With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after
* exceeding maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers25Percent() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
rtrTestUtils.makeWorker("worker", 10);
rtrTestUtils.makeWorker("worker2", 10);
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(25);
makeRemoteTaskRunner(rtrConfig);
for (int i = 1; i < 13; i++) {
String taskId = String.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success(taskId), jsonMapper
);
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
rtrTestUtils.taskAnnounced(i % 2 == 0 ? "worker2" : "worker", task.getId());
rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 ? "worker2" : "worker", task);
rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 ? "worker2" : "worker", task);
Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
);
}
}
/**
* With 2 workers and maxPercentageBlacklistWorkers(50), one worker should get blacklisted after the second failure
* and the second worker should never be blacklisted even after exceeding maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers50Percent() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
rtrTestUtils.makeWorker("worker", 10);
rtrTestUtils.makeWorker("worker2", 10);
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(50);
makeRemoteTaskRunner(rtrConfig);
for (int i = 1; i < 13; i++) {
String taskId = String.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success(taskId), jsonMapper
);
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
rtrTestUtils.taskAnnounced(i % 2 == 0 || i > 4 ? "worker2" : "worker", task.getId());
rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task);
rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task);
Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
i > 4 ? i - 2 : ((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
);
}
}
@Test
public void testSuccessfulTaskOnBlacklistedWorker() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), jsonMapper
);
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), jsonMapper
);
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), jsonMapper
);
Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
mockWorkerCompleteFailedTask(task3);
Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
mockWorkerCompleteSuccessfulTask(task2);
Assert.assertTrue(taskFuture2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
}
}

View File

@ -21,7 +21,9 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.metamx.http.client.HttpClient;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.PathChildrenCacheFactory;
@ -46,6 +48,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -113,7 +116,7 @@ public class RemoteTaskRunnerTestUtils
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner(
jsonMapper,
config,
new IndexerZkConfig(
@ -138,13 +141,13 @@ public class RemoteTaskRunnerTestUtils
return remoteTaskRunner;
}
Worker makeWorker(final String workerId) throws Exception
Worker makeWorker(final String workerId, final int capacity) throws Exception
{
Worker worker = new Worker(
"http",
workerId,
workerId,
3,
capacity,
"0"
);
@ -215,4 +218,45 @@ public class RemoteTaskRunnerTestUtils
}
);
}
public static class TestableRemoteTaskRunner extends RemoteTaskRunner
{
private long currentTimeMillis = System.currentTimeMillis();
public TestableRemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
IndexerZkConfig indexerZkConfig,
CuratorFramework cf,
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
super(
jsonMapper,
config,
indexerZkConfig,
cf,
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
cleanupExec,
provisioningStrategy
);
}
void setCurrentTimeMillis(long currentTimeMillis)
{
this.currentTimeMillis = currentTimeMillis;
}
@Override
protected long getCurrentTimeMillis()
{
return currentTimeMillis;
}
}
}