Blacklist workers if they fail for too many times (#3643)

* Blacklist workers if they fail for too many times

* Adding documentation

* Changing to timeout to period and updating docs

* 1. Add configurable maxPercentageBlacklistWorkers
2. Rename variable

* Change maxPercentageBlacklistWorkers to double

* Remove thread.sleep
This commit is contained in:
Niketh Sabbineni 2016-11-28 23:08:56 -08:00 committed by Nishant
parent 6922d684bf
commit 2640d170c3
9 changed files with 634 additions and 41 deletions

View File

@ -93,6 +93,10 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M|
|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|1|
|`druid.indexer.runner.maxRetriesBeforeBlacklist`|Number of consecutive times the middle manager can fail tasks, before the worker is blacklisted, must be at least 1|5|
|`druid.indexer.runner.workerBlackListBackoffTime`|How long to wait before a task is whitelisted again. This value should be greater that the value set for taskBlackListCleanupPeriod.|PT15M|
|`druid.indexer.runner.workerBlackListCleanupPeriod`|A duration after which the cleanup thread will startup to clean blacklisted workers.|PT5M|
|`druid.indexer.runner.maxPercentageBlacklistWorkers`|The maximum percentage of workers to blacklist, this must be between 0 and 100.|20|
There are additional configs for autoscaling (if it is enabled):

View File

@ -68,6 +68,18 @@ The overlord console can be used to view pending tasks, running tasks, available
http://<OVERLORD_IP>:<port>/console.html
```
#### Blacklisted Workers
If the workers fail tasks above a threshold, the overlord will blacklist these workers. No more than 20% of the nodes can be blacklisted. Blacklisted nodes will be periodically whitelisted.
The following vairables can be used to set the threshold and blacklist timeouts.
```
druid.indexer.runner.maxRetriesBeforeBlacklist
druid.indexer.runner.workerBlackListBackoffTime
druid.indexer.runner.workerBlackListCleanupPeriod
druid.indexer.runner.maxPercentageBlacklistWorkers
```
#### Autoscaling
The Autoscaling mechanisms currently in place are tightly coupled with our deployment infrastructure but the framework should be in place for other implementations. We are highly open to new implementations or extensions of the existing mechanisms. In our own deployments, middle manager nodes are Amazon AWS EC2 nodes and they are provisioned to register themselves in a [galaxy](https://github.com/ning/galaxy) environment.

View File

@ -66,6 +66,7 @@ import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.initialization.IndexerZkConfig;
@ -82,6 +83,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.io.IOException;
import java.io.InputStream;
@ -90,9 +92,11 @@ import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -151,6 +155,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy.
private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<>();
// Workers that have been blacklisted.
private final Set<ZkWorker> blackListedWorkers = Collections.synchronizedSet(new HashSet<ZkWorker>());
// task runner listeners
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
@ -309,6 +316,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
waitingForMonitor.wait();
}
}
scheduleBlackListedNodesCleanUp();
resourceManagement.startManagement(this);
started = true;
}
@ -735,7 +743,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey());
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
!blackListedWorkers.contains(input.getValue());
}
}
),
@ -1049,6 +1058,39 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
lazyWorkers.remove(worker.getHost());
}
/**
* Schedule a task that will clean the blackListed ZK Workers periodically
*/
private void scheduleBlackListedNodesCleanUp()
{
ScheduledExecutors.scheduleAtFixedRate(
cleanupExec,
Period.ZERO.toStandardDuration(),
config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run() {
long currentTimeStamp = System.currentTimeMillis();
for(ZkWorker zkWorker : blackListedWorkers){
cleanBlackListedNode(zkWorker, currentTimeStamp);
}
}
}
);
}
public void cleanBlackListedNode(ZkWorker zkWorker, long currentTimeStamp)
{
// Clean blacklisted workers if blacklisted time has elapsed
if(currentTimeStamp - zkWorker.getLastCompletedTaskTime().getMillis() >
config.getWorkerBlackListBackoffTime().toStandardDuration().getMillis()){
// White listing node
log.info("Whitelisting worker [%s]. ", zkWorker);
blackListedWorkers.remove(zkWorker);
}
}
/**
* Schedule a task that will, at some point in the future, clean up znodes and issue failures for "tasksToFail"
* if they are being run by "worker".
@ -1151,6 +1193,20 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
runningTasks.remove(taskStatus.getId());
// Update success/failure counters
if(taskStatus.isSuccess()){
zkWorker.resetCountinouslyFailedTasksCount();
} else if(taskStatus.isFailure()){
zkWorker.incrementCountinouslyFailedTasksCount();
}
// BlackList node if there are too many failures.
if(zkWorker.getCountinouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
blackListedWorkers.size() <=
zkWorkers.size()*(config.getMaxPercentageBlacklistWorkers()/100)){
blackListedWorkers.add(zkWorker);
}
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
@ -1243,6 +1299,11 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
public Collection<ImmutableWorkerInfo> getBlackListedWorkers()
{
return getImmutableWorkerFromZK(blackListedWorkers);
}
@VisibleForTesting
ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
{

View File

@ -39,6 +39,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -51,6 +52,7 @@ public class ZkWorker implements Closeable
private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
private AtomicInteger countinouslyFailedTasksCount = new AtomicInteger(0);
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
@ -168,6 +170,18 @@ public class ZkWorker implements Closeable
statusCache.close();
}
public int getCountinouslyFailedTasksCount() {
return countinouslyFailedTasksCount.get();
}
public void resetCountinouslyFailedTasksCount() {
this.countinouslyFailedTasksCount.set(0);
}
public void incrementCountinouslyFailedTasksCount() {
this.countinouslyFailedTasksCount.incrementAndGet();
}
@Override
public String toString()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.curator.CuratorUtils;
import org.joda.time.Period;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@ -49,6 +50,23 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
@Min(1)
private int pendingTasksRunnerNumThreads = 1;
@JsonProperty
@Min(1)
private int maxRetriesBeforeBlacklist = 5;
@JsonProperty
@NotNull
private Period workerBlackListBackoffTime = new Period("PT15M");
@JsonProperty
@NotNull
private Period workerBlackListCleanupPeriod = new Period("PT5M");
@JsonProperty
@Max(100)
@Min(0)
private double maxPercentageBlacklistWorkers = 20;
public Period getTaskAssignmentTimeout()
{
return taskAssignmentTimeout;
@ -75,6 +93,38 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
return pendingTasksRunnerNumThreads;
}
public int getMaxRetriesBeforeBlacklist() {
return maxRetriesBeforeBlacklist;
}
public void setMaxRetriesBeforeBlacklist(int maxRetriesBeforeBlacklist) {
this.maxRetriesBeforeBlacklist = maxRetriesBeforeBlacklist;
}
public Period getWorkerBlackListBackoffTime() {
return workerBlackListBackoffTime;
}
public void setTaskBlackListBackoffTimeMillis(Period taskBlackListBackoffTime) {
this.workerBlackListBackoffTime = taskBlackListBackoffTime;
}
public Period getWorkerBlackListCleanupPeriod() {
return workerBlackListCleanupPeriod;
}
public void setWorkerBlackListCleanupPeriod(Period workerBlackListCleanupPeriod) {
this.workerBlackListCleanupPeriod = workerBlackListCleanupPeriod;
}
public double getMaxPercentageBlacklistWorkers() {
return maxPercentageBlacklistWorkers;
}
public void setMaxPercentageBlacklistWorkers(int maxPercentageBlacklistWorkers) {
this.maxPercentageBlacklistWorkers = maxPercentageBlacklistWorkers;
}
@Override
public boolean equals(Object o)
{
@ -102,7 +152,19 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) {
return false;
}
return taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout);
if (!taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout)) {
return false;
}
if (maxRetriesBeforeBlacklist != that.maxRetriesBeforeBlacklist) {
return false;
}
if (!workerBlackListBackoffTime.equals(that.getWorkerBlackListBackoffTime())) {
return false;
}
if (maxPercentageBlacklistWorkers != that.maxPercentageBlacklistWorkers) {
return false;
}
return workerBlackListCleanupPeriod.equals(that.workerBlackListCleanupPeriod);
}
@ -115,6 +177,10 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
result = 31 * result + maxZnodeBytes;
result = 31 * result + taskShutdownLinkTimeout.hashCode();
result = 31 * result + pendingTasksRunnerNumThreads;
result = 31 * result + maxRetriesBeforeBlacklist;
result = 31 * result + workerBlackListBackoffTime.hashCode();
result = 31 * result + workerBlackListCleanupPeriod.hashCode();
result = 31 * result + (int)maxPercentageBlacklistWorkers;
return result;
}
@ -128,6 +194,10 @@ public class RemoteTaskRunnerConfig extends WorkerTaskRunnerConfig
", maxZnodeBytes=" + maxZnodeBytes +
", taskShutdownLinkTimeout=" + taskShutdownLinkTimeout +
", pendingTasksRunnerNumThreads=" + pendingTasksRunnerNumThreads +
", maxRetriesBeforeBlacklist=" + maxRetriesBeforeBlacklist +
", taskBlackListBackoffTimeMillis=" + workerBlackListBackoffTime +
", taskBlackListCleanupPeriod=" + workerBlackListCleanupPeriod +
", maxPercentageBlacklistWorkers= " + maxPercentageBlacklistWorkers +
'}';
}
}

View File

@ -481,6 +481,11 @@ public class RemoteTaskRunnerTest
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker", task);
}
private void mockWorkerCompleteFailedTask(final Task task) throws Exception
{
rtrTestUtils.mockWorkerCompleteFailedTask("worker", task);
}
@Test
public void testFindLazyWorkerTaskRunning() throws Exception
{
@ -608,4 +613,68 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(item2, workItems.get(1));
Assert.assertEquals(item1, workItems.get(2));
}
@Test
public void testBlacklistZKWorkers() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(timeoutPeriod));
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1",
new TaskResource("realtime1", 1),
"foo",
TaskStatus.success("realtime1"),
jsonMapper
);
Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(1,
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getCountinouslyFailedTasksCount());
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2",
new TaskResource("realtime2", 1),
"foo",
TaskStatus.running("realtime2"),
jsonMapper
);
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
mockWorkerCompleteFailedTask(task2);
Assert.assertTrue(taskFuture2.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount());
remoteTaskRunner.cleanBlackListedNode(remoteTaskRunner.findWorkerRunningTask(task2.getId()),
System.currentTimeMillis() + 2*timeoutPeriod.toStandardDuration().getMillis());
// After backOffTime the nodes are whitelisted
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getCountinouslyFailedTasksCount());
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3",
new TaskResource("realtime3", 1),
"foo",
TaskStatus.running("realtime3"),
jsonMapper
);
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
mockWorkerCompleteSuccessfulTask(task3);
Assert.assertTrue(taskFuture3.get(TIMEOUT_SECONDS, TimeUnit.SECONDS).isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0,
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getCountinouslyFailedTasksCount());
}
}

View File

@ -173,6 +173,12 @@ public class RemoteTaskRunnerTestUtils
cf.setData().forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
void mockWorkerCompleteFailedTask(final String workerId, final Task task) throws Exception
{
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.failure(task.getId()), DUMMY_LOCATION);
cf.setData().forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
boolean workerRunningTask(final String workerId, final String taskId)
{
return pathExists(joiner.join(statusPath, workerId, taskId));

View File

@ -63,4 +63,22 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
return "";
}
@Override
public int getMaxRetriesBeforeBlacklist()
{
return 1;
}
@Override
public Period getWorkerBlackListBackoffTime()
{
return timeout;
}
@Override
public Period getWorkerBlackListCleanupPeriod()
{
return timeout;
}
}

View File

@ -36,6 +36,9 @@ public class RemoteTaskRunnerConfigTest
private static final String DEFAULT_VERSION = "";
private static final long DEFAULT_MAX_ZNODE = 10 * 1024;
private static final int DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS = 5;
private static final int DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST = 5;
private static final Period DEFAULT_TASK_BACKOFF = new Period("PT10M");
private static final Period DEFAULT_BLACKLIST_CLEANUP_PERIOD = new Period("PT5M");
@Test
public void testGetTaskAssignmentTimeout() throws Exception
@ -49,7 +52,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getTaskAssignmentTimeout()
);
}
@ -66,7 +72,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getPendingTasksRunnerNumThreads()
);
}
@ -83,7 +92,10 @@ public class RemoteTaskRunnerConfigTest
version,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getMinWorkerVersion()
);
}
@ -100,7 +112,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
max,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getMaxZnodeBytes()
);
}
@ -117,7 +132,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getTaskShutdownLinkTimeout()
);
}
@ -134,11 +152,74 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getTaskCleanupTimeout()
);
}
@Test
public void testGetMaxRetriesBeforeBlacklist() throws Exception
{
final int maxRetriesBeforeBlacklist = 2;
Assert.assertEquals(
maxRetriesBeforeBlacklist,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
maxRetriesBeforeBlacklist,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getMaxRetriesBeforeBlacklist()
);
}
@Test
public void testGetWorkerBlackListBackoffTime() throws Exception
{
final Period taskBlackListBackoffTime = new Period("PT1M");
Assert.assertEquals(
taskBlackListBackoffTime,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
taskBlackListBackoffTime,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).getWorkerBlackListBackoffTime()
);
}
@Test
public void testGetTaskBlackListCleanupPeriod() throws Exception
{
final Period taskBlackListCleanupPeriod = Period.years(100);
Assert.assertEquals(
taskBlackListCleanupPeriod,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
taskBlackListCleanupPeriod
)).getWorkerBlackListCleanupPeriod()
);
}
@Test
public void testEquals() throws Exception
{
@ -149,7 +230,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
@ -157,13 +241,19 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
))
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
final int pendingTasksRunnerNumThreads = 20;
final int maxRetriesBeforeBlacklist = 1;
final Period taskBlackListBackoffTime = new Period("PT1M");
final Period taskBlackListCleanupPeriod = Period.years(10);
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -171,7 +261,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -179,7 +272,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
@ -189,7 +285,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
@ -197,7 +296,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
@ -207,7 +309,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -215,7 +320,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
@ -225,7 +333,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -233,7 +344,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
@ -244,7 +358,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -252,7 +369,10 @@ public class RemoteTaskRunnerConfigTest
version,
DEFAULT_MAX_ZNODE,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
@ -264,7 +384,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -272,7 +395,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
@ -283,7 +409,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -291,9 +420,87 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
DEFAULT_TASK_BACKOFF,
taskBlackListCleanupPeriod
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
))
);
}
@Test
@ -306,7 +513,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
@ -314,13 +524,19 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
DEFAULT_TASK_BACKOFF,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).hashCode()
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
final int pendingTasksRunnerNumThreads = 20;
final int maxRetriesBeforeBlacklist = 80;
final Period taskBlackListBackoffTime = new Period("PT1M");
final Period taskBlackListCleanupPeriod = Period.years(10);
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -328,7 +544,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -336,7 +555,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
@ -346,7 +568,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
@ -354,7 +579,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
@ -364,7 +592,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -372,7 +603,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
@ -382,7 +616,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -390,7 +627,10 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_VERSION,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
@ -401,7 +641,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -409,7 +652,10 @@ public class RemoteTaskRunnerConfigTest
version,
DEFAULT_MAX_ZNODE,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
@ -421,7 +667,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -429,7 +678,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
@ -440,7 +692,10 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
pendingTasksRunnerNumThreads
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
@ -448,9 +703,87 @@ public class RemoteTaskRunnerConfigTest
version,
max,
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
DEFAULT_MAX_RETRIES_BEFORE_BLACKLIST,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
DEFAULT_TASK_BACKOFF,
taskBlackListCleanupPeriod
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
taskBlackListCleanupPeriod
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads,
maxRetriesBeforeBlacklist,
taskBlackListBackoffTime,
DEFAULT_BLACKLIST_CLEANUP_PERIOD
)).hashCode()
);
}
private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException
@ -464,7 +797,10 @@ public class RemoteTaskRunnerConfigTest
String minWorkerVersion,
long maxZnodeBytes,
Period taskShutdownLinkTimeout,
int pendingTasksRunnerNumThreads
int pendingTasksRunnerNumThreads,
int maxRetriesBeforeBlacklist,
Period taskBlackListBackoffTime,
Period taskBlackListCleanupPeriod
)
{
final Map<String, Object> objectMap = new HashMap<>();
@ -474,6 +810,9 @@ public class RemoteTaskRunnerConfigTest
objectMap.put("maxZnodeBytes", maxZnodeBytes);
objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout);
objectMap.put("pendingTasksRunnerNumThreads", pendingTasksRunnerNumThreads);
objectMap.put("maxRetriesBeforeBlacklist", maxRetriesBeforeBlacklist);
objectMap.put("workerBlackListBackoffTime", taskBlackListBackoffTime);
objectMap.put("workerBlackListCleanupPeriod", taskBlackListCleanupPeriod);
return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class);
}
}