From dd0b84e76606f76d46d3278d662a1e19550ac955 Mon Sep 17 00:00:00 2001 From: David Lim Date: Thu, 3 Aug 2017 11:34:45 -0600 Subject: [PATCH] Fix bugs in RTR related to blacklisting, change default worker strategy (#4619) * fix bugs in RTR related to blacklisting, change default worker strategy to equalDistribution * code review and additional changes * fix errorprone * code review changes --- .../content/configuration/indexing-service.md | 2 +- .../overlord/ImmutableWorkerInfo.java | 33 +++- .../indexing/overlord/RemoteTaskRunner.java | 186 +++++++++--------- .../io/druid/indexing/overlord/ZkWorker.java | 39 +++- .../config/RemoteTaskRunnerConfig.java | 6 +- .../overlord/setup/WorkerBehaviorConfig.java | 2 +- .../overlord/ImmutableWorkerInfoTest.java | 20 ++ ...kRunnerRunPendingTasksConcurrencyTest.java | 4 +- .../overlord/RemoteTaskRunnerTest.java | 157 +++++++++++++-- .../overlord/RemoteTaskRunnerTestUtils.java | 52 ++++- 10 files changed, 378 insertions(+), 123 deletions(-) diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 36d6f8603a8..20388653af4 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -193,7 +193,7 @@ Issuing a GET request at the same URL will return the current worker config spec |Property|Description|Default| |--------|-----------|-------| -|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|fillCapacity| +|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|equalDistribution| |`autoScaler`|Only used if autoscaling is enabled. See below.|null| To view the audit history of worker config issue a GET request to the URL - diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java index ca9c72080ec..86836495d0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.worker.Worker; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Collection; import java.util.Set; @@ -39,6 +40,7 @@ public class ImmutableWorkerInfo private final ImmutableSet availabilityGroups; private final ImmutableSet runningTasks; private final DateTime lastCompletedTaskTime; + private final DateTime blacklistedUntil; @JsonCreator public ImmutableWorkerInfo( @@ -46,7 +48,8 @@ public class ImmutableWorkerInfo @JsonProperty("currCapacityUsed") int currCapacityUsed, @JsonProperty("availabilityGroups") Set availabilityGroups, @JsonProperty("runningTasks") Collection runningTasks, - @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime + @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime, + @Nullable @JsonProperty("blacklistedUntil") DateTime blacklistedUntil ) { this.worker = worker; @@ -54,6 +57,18 @@ public class ImmutableWorkerInfo this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); this.runningTasks = ImmutableSet.copyOf(runningTasks); this.lastCompletedTaskTime = lastCompletedTaskTime; + this.blacklistedUntil = blacklistedUntil; + } + + public ImmutableWorkerInfo( + Worker worker, + int currCapacityUsed, + Set availabilityGroups, + Collection runningTasks, + DateTime lastCompletedTaskTime + ) + { + this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null); } @JsonProperty("worker") @@ -91,6 +106,12 @@ public class ImmutableWorkerInfo return lastCompletedTaskTime; } + @JsonProperty + public DateTime getBlacklistedUntil() + { + return blacklistedUntil; + } + public boolean isValidVersion(String minVersion) { return worker.getVersion().compareTo(minVersion) >= 0; @@ -126,8 +147,12 @@ public class ImmutableWorkerInfo if (!runningTasks.equals(that.runningTasks)) { return false; } - return lastCompletedTaskTime.equals(that.lastCompletedTaskTime); - + if (!lastCompletedTaskTime.equals(that.lastCompletedTaskTime)) { + return false; + } + return !(blacklistedUntil != null + ? !blacklistedUntil.equals(that.blacklistedUntil) + : that.blacklistedUntil != null); } @Override @@ -138,6 +163,7 @@ public class ImmutableWorkerInfo result = 31 * result + availabilityGroups.hashCode(); result = 31 * result + runningTasks.hashCode(); result = 31 * result + lastCompletedTaskTime.hashCode(); + result = 31 * result + (blacklistedUntil != null ? blacklistedUntil.hashCode() : 0); return result; } @@ -150,6 +176,7 @@ public class ImmutableWorkerInfo ", availabilityGroups=" + availabilityGroups + ", runningTasks=" + runningTasks + ", lastCompletedTaskTime=" + lastCompletedTaskTime + + ", blacklistedUntil=" + blacklistedUntil + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 64bdf7a5601..46ac4b33d6f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -22,7 +22,6 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -65,12 +64,12 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; -import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.server.initialization.IndexerZkConfig; @@ -161,7 +160,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>(); // Workers that have been blacklisted. - private final Set blackListedWorkers = Collections.synchronizedSet(new HashSet()); + private final Set blackListedWorkers = Collections.synchronizedSet(new HashSet<>()); // task runner listeners private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); @@ -328,7 +327,14 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer waitingForMonitor.wait(); } } - scheduleBlackListedNodesCleanUp(); + + ScheduledExecutors.scheduleAtFixedRate( + cleanupExec, + Period.ZERO.toStandardDuration(), + config.getWorkerBlackListCleanupPeriod().toStandardDuration(), + () -> checkBlackListedNodes() + ); + provisioningService = provisioningStrategy.makeProvisioningService(this); lifecycleLock.started(); } @@ -678,15 +684,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer @VisibleForTesting static void sortByInsertionTime(List tasks) { - Collections.sort(tasks, new Comparator() - { - @Override - public int compare(RemoteTaskRunnerWorkItem o1, RemoteTaskRunnerWorkItem o2) - { - return o1.getQueueInsertionTime().compareTo(o2.getQueueInsertionTime()); - } - } - ); + Collections.sort(tasks, Comparator.comparing(RemoteTaskRunnerWorkItem::getQueueInsertionTime)); } /** @@ -744,8 +742,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer WorkerBehaviorConfig workerConfig = workerConfigRef.get(); WorkerSelectStrategy strategy; if (workerConfig == null || workerConfig.getSelectStrategy() == null) { - log.warn("No worker selections strategy set. Using default."); strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY; + log.info("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName()); } else { strategy = workerConfig.getSelectStrategy(); } @@ -787,7 +785,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (immutableZkWorker.isPresent() && workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) - == null) { + == null) { assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); } } @@ -834,7 +832,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer final String worker = theZkWorker.getWorker().getHost(); synchronized (statusLock) { if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) { - // the worker might got killed or has been marked as lazy. + // the worker might have been killed or marked as lazy log.info("Not assigning task to already removed worker[%s]", worker); return false; } @@ -1080,45 +1078,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer log.error(e, "Exception closing worker[%s]!", worker.getHost()); } zkWorkers.remove(worker.getHost()); + checkBlackListedNodes(); } } lazyWorkers.remove(worker.getHost()); } - /** - * Schedule a task that will clean the blackListed ZK Workers periodically - */ - private void scheduleBlackListedNodesCleanUp() - { - ScheduledExecutors.scheduleAtFixedRate( - cleanupExec, - Period.ZERO.toStandardDuration(), - config.getWorkerBlackListCleanupPeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - long currentTimeStamp = System.currentTimeMillis(); - for (ZkWorker zkWorker : blackListedWorkers) { - cleanBlackListedNode(zkWorker, currentTimeStamp); - } - } - } - ); - } - - public void cleanBlackListedNode(ZkWorker zkWorker, long currentTimeStamp) - { - // Clean blacklisted workers if blacklisted time has elapsed - if(currentTimeStamp - zkWorker.getLastCompletedTaskTime().getMillis() > - config.getWorkerBlackListBackoffTime().toStandardDuration().getMillis()){ - // White listing node - log.info("Whitelisting worker [%s]. ", zkWorker); - blackListedWorkers.remove(zkWorker); - } - } - /** * Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail" * if they are being run by "worker". @@ -1222,17 +1187,32 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer runningTasks.remove(taskStatus.getId()); // Update success/failure counters - if(taskStatus.isSuccess()){ - zkWorker.resetCountinouslyFailedTasksCount(); - } else if(taskStatus.isFailure()){ - zkWorker.incrementCountinouslyFailedTasksCount(); - } + if (zkWorker != null) { + if (taskStatus.isSuccess()) { + zkWorker.resetContinuouslyFailedTasksCount(); + if (blackListedWorkers.remove(zkWorker)) { + zkWorker.setBlacklistedUntil(null); + log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker()); + } + } else if (taskStatus.isFailure()) { + zkWorker.incrementContinuouslyFailedTasksCount(); + } - // BlackList node if there are too many failures. - if(zkWorker.getCountinouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && - blackListedWorkers.size() <= - zkWorkers.size()*(config.getMaxPercentageBlacklistWorkers()/100)){ - blackListedWorkers.add(zkWorker); + // Blacklist node if there are too many failures. + synchronized (blackListedWorkers) { + if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && + blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { + zkWorker.setBlacklistedUntil(DateTime.now().plus(config.getWorkerBlackListBackoffTime())); + if (blackListedWorkers.add(zkWorker)) { + log.info( + "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", + zkWorker.getWorker(), + zkWorker.getBlacklistedUntil(), + zkWorker.getContinuouslyFailedTasksCount() + ); + } + } + } } // Notify interested parties @@ -1297,41 +1277,71 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private static ImmutableList getImmutableWorkerFromZK(Collection workers) { - return ImmutableList.copyOf( - Collections2.transform( - workers, - new Function() - { - @Override - public ImmutableWorkerInfo apply(ZkWorker input) - { - return input.toImmutable(); - } - } - ) - ); + return ImmutableList.copyOf(Collections2.transform(workers, ZkWorker::toImmutable)); } private static ImmutableList getWorkerFromZK(Collection workers) { - return ImmutableList.copyOf( - Collections2.transform( - workers, - new Function() - { - @Override - public Worker apply(ZkWorker input) - { - return input.getWorker(); - } - } - ) - ); + return ImmutableList.copyOf(Collections2.transform(workers, ZkWorker::getWorker)); } public Collection getBlackListedWorkers() { - return getImmutableWorkerFromZK(blackListedWorkers); + synchronized (blackListedWorkers) { + return getImmutableWorkerFromZK(blackListedWorkers); + } + } + + private boolean shouldRemoveNodeFromBlackList(ZkWorker zkWorker) + { + if (blackListedWorkers.size() > zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0)) { + log.info( + "Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", + zkWorker.getWorker(), + config.getMaxPercentageBlacklistWorkers() + ); + + return true; + } + + long remainingMillis = zkWorker.getBlacklistedUntil().getMillis() - getCurrentTimeMillis(); + if (remainingMillis <= 0) { + log.info("Removing [%s] from blacklist because backoff time elapsed", zkWorker.getWorker()); + return true; + } + + log.info("[%s] still blacklisted for [%,ds]", zkWorker.getWorker(), remainingMillis / 1000); + return false; + } + + @VisibleForTesting + void checkBlackListedNodes() + { + boolean shouldRunPendingTasks = false; + + // must be synchronized while iterating: + // https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#synchronizedSet-java.util.Set- + synchronized (blackListedWorkers) { + for (Iterator iterator = blackListedWorkers.iterator(); iterator.hasNext(); ) { + ZkWorker zkWorker = iterator.next(); + if (shouldRemoveNodeFromBlackList(zkWorker)) { + iterator.remove(); + zkWorker.resetContinuouslyFailedTasksCount(); + zkWorker.setBlacklistedUntil(null); + shouldRunPendingTasks = true; + } + } + } + + if (shouldRunPendingTasks) { + runPendingTasks(); + } + } + + @VisibleForTesting + protected long getCurrentTimeMillis() + { + return System.currentTimeMillis(); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index 5ef9ea0dacb..1ce283254fe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -51,8 +51,10 @@ public class ZkWorker implements Closeable private final Function cacheConverter; private AtomicReference worker; - private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime()); - private AtomicInteger countinouslyFailedTasksCount = new AtomicInteger(0); + private AtomicReference lastCompletedTaskTime = new AtomicReference<>(new DateTime()); + private AtomicReference blacklistedUntil = new AtomicReference<>(); + + private AtomicInteger continuouslyFailedTasksCount = new AtomicInteger(0); public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { @@ -134,6 +136,12 @@ public class ZkWorker implements Closeable return lastCompletedTaskTime.get(); } + @JsonProperty + public DateTime getBlacklistedUntil() + { + return blacklistedUntil.get(); + } + public boolean isRunningTask(String taskId) { return getRunningTasks().containsKey(taskId); @@ -158,10 +166,22 @@ public class ZkWorker implements Closeable lastCompletedTaskTime.set(completedTaskTime); } + public void setBlacklistedUntil(DateTime blacklistedUntil) + { + this.blacklistedUntil.set(blacklistedUntil); + } + public ImmutableWorkerInfo toImmutable() { - return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get()); + return new ImmutableWorkerInfo( + worker.get(), + getCurrCapacityUsed(), + getAvailabilityGroups(), + getRunningTaskIds(), + lastCompletedTaskTime.get(), + blacklistedUntil.get() + ); } @Override @@ -170,19 +190,19 @@ public class ZkWorker implements Closeable statusCache.close(); } - public int getCountinouslyFailedTasksCount() + public int getContinuouslyFailedTasksCount() { - return countinouslyFailedTasksCount.get(); + return continuouslyFailedTasksCount.get(); } - public void resetCountinouslyFailedTasksCount() + public void resetContinuouslyFailedTasksCount() { - this.countinouslyFailedTasksCount.set(0); + this.continuouslyFailedTasksCount.set(0); } - public void incrementCountinouslyFailedTasksCount() + public void incrementContinuouslyFailedTasksCount() { - this.countinouslyFailedTasksCount.incrementAndGet(); + this.continuouslyFailedTasksCount.incrementAndGet(); } @Override @@ -191,6 +211,7 @@ public class ZkWorker implements Closeable return "ZkWorker{" + "worker=" + worker + ", lastCompletedTaskTime=" + lastCompletedTaskTime + + ", blacklistedUntil=" + blacklistedUntil + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index 535d6f8f29a..3ebc93e5720 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -65,7 +65,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig @JsonProperty @Max(100) @Min(0) - private double maxPercentageBlacklistWorkers = 20; + private int maxPercentageBlacklistWorkers = 20; public Period getTaskAssignmentTimeout() { @@ -123,7 +123,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig this.workerBlackListCleanupPeriod = workerBlackListCleanupPeriod; } - public double getMaxPercentageBlacklistWorkers() + public int getMaxPercentageBlacklistWorkers() { return maxPercentageBlacklistWorkers; } @@ -188,7 +188,7 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig result = 31 * result + maxRetriesBeforeBlacklist; result = 31 * result + workerBlackListBackoffTime.hashCode(); result = 31 * result + workerBlackListCleanupPeriod.hashCode(); - result = 31 * result + (int)maxPercentageBlacklistWorkers; + result = 31 * result + maxPercentageBlacklistWorkers; return result; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java index 57e571659d5..0e58fc39c5b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java @@ -29,7 +29,7 @@ import io.druid.indexing.overlord.autoscaling.NoopAutoScaler; public class WorkerBehaviorConfig { public static final String CONFIG_KEY = "worker.config"; - public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy(); + public static WorkerSelectStrategy DEFAULT_STRATEGY = new EqualDistributionWorkerSelectStrategy(); public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler(); public static WorkerBehaviorConfig defaultConfig() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java index f5e4eb0d58e..213fa3f5af3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/ImmutableWorkerInfoTest.java @@ -167,6 +167,26 @@ public class ImmutableWorkerInfoTest new DateTime("2015-01-01T01:01:02Z") ), false); + // same worker different blacklistedUntil + assertEqualsAndHashCode(new ImmutableWorkerInfo( + new Worker( + "http", "testWorker1", "192.0.0.1", 10, "v1" + ), + 3, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:01Z"), + new DateTime("2017-07-30") + ), new ImmutableWorkerInfo( + new Worker( + "http", "testWorker2", "192.0.0.1", 10, "v1" + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + new DateTime("2015-01-01T01:01:02Z"), + new DateTime("2017-07-31") + ), false); } private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java index fb870d279f8..50e492b4ffb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java @@ -59,8 +59,8 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest @Test(timeout = 60_000) public void testConcurrency() throws Exception { - rtrTestUtils.makeWorker("worker0"); - rtrTestUtils.makeWorker("worker1"); + rtrTestUtils.makeWorker("worker0", 3); + rtrTestUtils.makeWorker("worker1", 3); remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( new TestRemoteTaskRunnerConfig(new Period("PT3600S")) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 3aee1a97c91..766163932f2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -439,7 +439,7 @@ public class RemoteTaskRunnerTest private void makeWorker() throws Exception { - worker = rtrTestUtils.makeWorker(workerHost); + worker = rtrTestUtils.makeWorker(workerHost, 3); } private void disableWorker() throws Exception @@ -609,7 +609,7 @@ public class RemoteTaskRunnerTest .withQueueInsertionTime(new DateTime("2015-01-01T00:00:01Z")); ArrayList workItems = Lists.newArrayList(item1, item2, item3); RemoteTaskRunner.sortByInsertionTime(workItems); - Assert.assertEquals(item3,workItems.get(0)); + Assert.assertEquals(item3, workItems.get(0)); Assert.assertEquals(item2, workItems.get(1)); Assert.assertEquals(item1, workItems.get(2)); } @@ -619,7 +619,11 @@ public class RemoteTaskRunnerTest { Period timeoutPeriod = Period.millis(1000); makeWorker(); - makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(timeoutPeriod)); + + RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod); + rtrConfig.setMaxPercentageBlacklistWorkers(100); + + makeRemoteTaskRunner(rtrConfig); TestRealtimeTask task1 = new TestRealtimeTask( "realtime1", @@ -635,7 +639,7 @@ public class RemoteTaskRunnerTest Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals(1, - remoteTaskRunner.findWorkerRunningTask(task1.getId()).getCountinouslyFailedTasksCount()); + remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount()); TestRealtimeTask task2 = new TestRealtimeTask( "realtime2", @@ -650,16 +654,25 @@ public class RemoteTaskRunnerTest mockWorkerCompleteFailedTask(task2); Assert.assertTrue(taskFuture2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(2, - remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount()); + Assert.assertEquals( + 2, + remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount() + ); - remoteTaskRunner.cleanBlackListedNode(remoteTaskRunner.findWorkerRunningTask(task2.getId()), - System.currentTimeMillis() + 2*timeoutPeriod.toStandardDuration().getMillis()); + ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner) + .setCurrentTimeMillis(System.currentTimeMillis()); + remoteTaskRunner.checkBlackListedNodes(); - // After backOffTime the nodes are whitelisted + Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); + + ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner) + .setCurrentTimeMillis(System.currentTimeMillis() + 2 * timeoutPeriod.toStandardDuration().getMillis()); + remoteTaskRunner.checkBlackListedNodes(); + + // After backOffTime the nodes are removed from blacklist Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(2, - remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount()); + Assert.assertEquals(0, + remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()); TestRealtimeTask task3 = new TestRealtimeTask( "realtime3", @@ -675,6 +688,126 @@ public class RemoteTaskRunnerTest Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals(0, - remoteTaskRunner.findWorkerRunningTask(task3.getId()).getCountinouslyFailedTasksCount()); + remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount()); + } + + /** + * With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after + * exceeding maxRetriesBeforeBlacklist. + */ + @Test + public void testBlacklistZKWorkers25Percent() throws Exception + { + Period timeoutPeriod = Period.millis(1000); + + rtrTestUtils.makeWorker("worker", 10); + rtrTestUtils.makeWorker("worker2", 10); + + RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod); + rtrConfig.setMaxPercentageBlacklistWorkers(25); + + makeRemoteTaskRunner(rtrConfig); + + for (int i = 1; i < 13; i++) { + String taskId = String.format("rt-%d", i); + TestRealtimeTask task = new TestRealtimeTask( + taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success(taskId), jsonMapper + ); + + Future taskFuture = remoteTaskRunner.run(task); + + rtrTestUtils.taskAnnounced(i % 2 == 0 ? "worker2" : "worker", task.getId()); + rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 ? "worker2" : "worker", task); + rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 ? "worker2" : "worker", task); + + Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); + Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); + Assert.assertEquals( + ((i + 1) / 2), + remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount() + ); + } + } + + /** + * With 2 workers and maxPercentageBlacklistWorkers(50), one worker should get blacklisted after the second failure + * and the second worker should never be blacklisted even after exceeding maxRetriesBeforeBlacklist. + */ + @Test + public void testBlacklistZKWorkers50Percent() throws Exception + { + Period timeoutPeriod = Period.millis(1000); + + rtrTestUtils.makeWorker("worker", 10); + rtrTestUtils.makeWorker("worker2", 10); + + RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod); + rtrConfig.setMaxPercentageBlacklistWorkers(50); + + makeRemoteTaskRunner(rtrConfig); + + for (int i = 1; i < 13; i++) { + String taskId = String.format("rt-%d", i); + TestRealtimeTask task = new TestRealtimeTask( + taskId, new TaskResource(taskId, 1), "foo", TaskStatus.success(taskId), jsonMapper + ); + + Future taskFuture = remoteTaskRunner.run(task); + + rtrTestUtils.taskAnnounced(i % 2 == 0 || i > 4 ? "worker2" : "worker", task.getId()); + rtrTestUtils.mockWorkerRunningTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task); + rtrTestUtils.mockWorkerCompleteFailedTask(i % 2 == 0 || i > 4 ? "worker2" : "worker", task); + + Assert.assertTrue(taskFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); + Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size()); + Assert.assertEquals( + i > 4 ? i - 2 : ((i + 1) / 2), + remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount() + ); + } + } + + @Test + public void testSuccessfulTaskOnBlacklistedWorker() throws Exception + { + Period timeoutPeriod = Period.millis(1000); + makeWorker(); + + RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod); + rtrConfig.setMaxPercentageBlacklistWorkers(100); + + makeRemoteTaskRunner(rtrConfig); + + TestRealtimeTask task1 = new TestRealtimeTask( + "realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), jsonMapper + ); + TestRealtimeTask task2 = new TestRealtimeTask( + "realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), jsonMapper + ); + TestRealtimeTask task3 = new TestRealtimeTask( + "realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), jsonMapper + ); + + Future taskFuture1 = remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + mockWorkerCompleteFailedTask(task1); + Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); + Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); + + Future taskFuture2 = remoteTaskRunner.run(task2); + Assert.assertTrue(taskAnnounced(task2.getId())); + mockWorkerRunningTask(task2); + + Future taskFuture3 = remoteTaskRunner.run(task3); + Assert.assertTrue(taskAnnounced(task3.getId())); + mockWorkerRunningTask(task3); + mockWorkerCompleteFailedTask(task3); + Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure()); + Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); + + mockWorkerCompleteSuccessfulTask(task2); + Assert.assertTrue(taskFuture2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess()); + Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index bacfc5097e2..882130d52de 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -21,7 +21,9 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.metamx.http.client.HttpClient; import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.PathChildrenCacheFactory; @@ -46,6 +48,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.zookeeper.CreateMode; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; /** @@ -113,7 +116,7 @@ public class RemoteTaskRunnerTestUtils ProvisioningStrategy provisioningStrategy ) { - RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner( + RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner( jsonMapper, config, new IndexerZkConfig( @@ -138,13 +141,13 @@ public class RemoteTaskRunnerTestUtils return remoteTaskRunner; } - Worker makeWorker(final String workerId) throws Exception + Worker makeWorker(final String workerId, final int capacity) throws Exception { Worker worker = new Worker( "http", workerId, workerId, - 3, + capacity, "0" ); @@ -153,7 +156,7 @@ public class RemoteTaskRunnerTestUtils jsonMapper.writeValueAsBytes(worker) ); cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, workerId)); - + return worker; } @@ -215,4 +218,45 @@ public class RemoteTaskRunnerTestUtils } ); } + + public static class TestableRemoteTaskRunner extends RemoteTaskRunner + { + private long currentTimeMillis = System.currentTimeMillis(); + + public TestableRemoteTaskRunner( + ObjectMapper jsonMapper, + RemoteTaskRunnerConfig config, + IndexerZkConfig indexerZkConfig, + CuratorFramework cf, + PathChildrenCacheFactory.Builder pathChildrenCacheFactory, + HttpClient httpClient, + Supplier workerConfigRef, + ScheduledExecutorService cleanupExec, + ProvisioningStrategy provisioningStrategy + ) + { + super( + jsonMapper, + config, + indexerZkConfig, + cf, + pathChildrenCacheFactory, + httpClient, + workerConfigRef, + cleanupExec, + provisioningStrategy + ); + } + + void setCurrentTimeMillis(long currentTimeMillis) + { + this.currentTimeMillis = currentTimeMillis; + } + + @Override + protected long getCurrentTimeMillis() + { + return currentTimeMillis; + } + } }