RTR has multiple threads for assignment of pending tasks now.

This commit is contained in:
Himanshu Gupta 2016-02-22 15:42:15 -06:00
parent 1fe277ee29
commit bc156effe7
4 changed files with 248 additions and 92 deletions

View File

@ -82,6 +82,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M|
|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|3|
There are additional configs for autoscaling (if it is enabled):

View File

@ -54,6 +54,7 @@ import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskLocation;
@ -96,7 +97,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -143,7 +143,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// tasks that are complete but not cleaned up yet
private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
private final ExecutorService runPendingTasksExec;
// Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy.
private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<>();
@ -151,6 +151,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// task runner listeners
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
// workers which were assigned a task and are yet to acknowledge same.
// Map: workerId -> taskId
private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<>();
// Map: taskId -> taskId .tasks which are being tried to be assigned to a worker
private final ConcurrentMap<String, String> tryAssignTasks = new ConcurrentHashMap<>();
private final Object statusLock = new Object();
private volatile boolean started = false;
@ -183,6 +189,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
this.runPendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
);
}
@Override
@ -320,6 +330,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
zkWorker.close();
}
workerPathCache.close();
if (runPendingTasksExec != null) {
runPendingTasksExec.shutdown();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -548,7 +562,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
/**
* This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that
* This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
* This method should be run each time there is new worker capacity or if new tasks are assigned.
*/
@ -566,17 +580,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
String taskId = taskRunnerWorkItem.getTaskId();
try {
if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
try {
if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception while trying to assign task")
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
taskComplete(workItem, null, TaskStatus.failure(taskId));
}
finally {
tryAssignTasks.remove(taskId);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception while trying to assign task")
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
taskComplete(workItem, null, TaskStatus.failure(taskId));
}
}
}
@ -650,40 +669,56 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
} else {
strategy = workerConfig.getSelectStrategy();
}
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
ZkWorker assignedWorker = null;
try {
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
return !lazyWorkers.containsKey(input.getKey());
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey());
}
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
{
@Override
public ImmutableZkWorker transformEntry(
String key, ZkWorker value
)
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
{
return value.toImmutable();
@Override
public ImmutableZkWorker transformEntry(
String key, ZkWorker value
)
{
return value.toImmutable();
}
}
}
)
),
task
);
if (immutableZkWorker.isPresent()) {
final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
return announceTask(task, zkWorker, taskRunnerWorkItem);
} else {
)
),
task
);
if (immutableZkWorker.isPresent()
&&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
return announceTask(task, assignedWorker, taskRunnerWorkItem);
}
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
} finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
// note that this is essential as a task might not get a worker because a worker was assigned another task.
// so this will ensure that other pending tasks are tried for assignment again.
runPendingTasks();
}
}
}
}
@ -741,8 +776,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.error(
"Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!",
log.makeAlert(
"Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
worker,
task.getId(),
elapsed,

View File

@ -48,6 +48,10 @@ public class RemoteTaskRunnerConfig
@JsonProperty
private Period taskShutdownLinkTimeout = new Period("PT1M");
@JsonProperty
@Min(1)
private int pendingTasksRunnerNumThreads = 3;
public Period getTaskAssignmentTimeout()
{
return taskAssignmentTimeout;
@ -73,6 +77,12 @@ public class RemoteTaskRunnerConfig
return taskShutdownLinkTimeout;
}
public int getPendingTasksRunnerNumThreads()
{
return pendingTasksRunnerNumThreads;
}
@Override
public boolean equals(Object o)
{
@ -85,30 +95,47 @@ public class RemoteTaskRunnerConfig
RemoteTaskRunnerConfig that = (RemoteTaskRunnerConfig) o;
if (getMaxZnodeBytes() != that.getMaxZnodeBytes()) {
if (maxZnodeBytes != that.maxZnodeBytes) {
return false;
}
if (!getTaskAssignmentTimeout().equals(that.getTaskAssignmentTimeout())) {
if (pendingTasksRunnerNumThreads != that.pendingTasksRunnerNumThreads) {
return false;
}
if (!getTaskCleanupTimeout().equals(that.getTaskCleanupTimeout())) {
if (!taskAssignmentTimeout.equals(that.taskAssignmentTimeout)) {
return false;
}
if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) {
if (!taskCleanupTimeout.equals(that.taskCleanupTimeout)) {
return false;
}
return getTaskShutdownLinkTimeout().equals(that.getTaskShutdownLinkTimeout());
if (!minWorkerVersion.equals(that.minWorkerVersion)) {
return false;
}
return taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout);
}
@Override
public int hashCode()
{
int result = getTaskAssignmentTimeout().hashCode();
result = 31 * result + getTaskCleanupTimeout().hashCode();
result = 31 * result + getMinWorkerVersion().hashCode();
result = 31 * result + getMaxZnodeBytes();
result = 31 * result + getTaskShutdownLinkTimeout().hashCode();
int result = taskAssignmentTimeout.hashCode();
result = 31 * result + taskCleanupTimeout.hashCode();
result = 31 * result + minWorkerVersion.hashCode();
result = 31 * result + maxZnodeBytes;
result = 31 * result + taskShutdownLinkTimeout.hashCode();
result = 31 * result + pendingTasksRunnerNumThreads;
return result;
}
@Override
public String toString()
{
return "RemoteTaskRunnerConfig{" +
"taskAssignmentTimeout=" + taskAssignmentTimeout +
", taskCleanupTimeout=" + taskCleanupTimeout +
", minWorkerVersion='" + minWorkerVersion + '\'' +
", maxZnodeBytes=" + maxZnodeBytes +
", taskShutdownLinkTimeout=" + taskShutdownLinkTimeout +
", pendingTasksRunnerNumThreads=" + pendingTasksRunnerNumThreads +
'}';
}
}

View File

@ -35,6 +35,7 @@ public class RemoteTaskRunnerConfigTest
private static final Period DEFAULT_TIMEOUT = Period.ZERO;
private static final String DEFAULT_VERSION = "";
private static final long DEFAULT_MAX_ZNODE = 10 * 1024;
private static final int DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS = 5;
@Test
public void testGetTaskAssignmentTimeout() throws Exception
@ -47,24 +48,26 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).getTaskAssignmentTimeout()
);
}
@Test
public void testGetTaskCleanupTimeout() throws Exception
public void testGetPendingTasksRunnerNumThreads() throws Exception
{
final Period timeout = Period.hours(1);
final int pendingTasksRunnerNumThreads = 20;
Assert.assertEquals(
timeout,
pendingTasksRunnerNumThreads,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
)).getTaskCleanupTimeout()
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
)).getPendingTasksRunnerNumThreads()
);
}
@ -79,7 +82,8 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
version,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).getMinWorkerVersion()
);
}
@ -95,7 +99,8 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
max,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).getMaxZnodeBytes()
);
}
@ -111,11 +116,29 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
timeout
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).getTaskShutdownLinkTimeout()
);
}
@Test
public void testGetTaskCleanupTimeout() throws Exception
{
final Period timeout = Period.hours(1);
Assert.assertEquals(
timeout,
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).getTaskCleanupTimeout()
);
}
@Test
public void testEquals() throws Exception
{
@ -125,33 +148,38 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
))
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
final int pendingTasksRunnerNumThreads = 20;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
))
);
Assert.assertNotEquals(
@ -160,14 +188,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
))
);
Assert.assertNotEquals(
@ -176,14 +206,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
))
);
Assert.assertNotEquals(
@ -192,14 +224,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
))
);
@ -209,14 +243,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
timeout,
pendingTasksRunnerNumThreads
))
);
@ -227,16 +263,37 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
))
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads
)),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
))
);
}
@Test
@ -248,33 +305,38 @@ public class RemoteTaskRunnerConfigTest
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_VERSION,
DEFAULT_MAX_ZNODE,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).hashCode()
);
final Period timeout = Period.years(999);
final String version = "someVersion";
final long max = 20 * 1024;
final int pendingTasksRunnerNumThreads = 20;
Assert.assertEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode()
);
Assert.assertNotEquals(
@ -283,14 +345,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
DEFAULT_TIMEOUT,
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode()
);
Assert.assertNotEquals(
@ -299,14 +363,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
DEFAULT_TIMEOUT,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode()
);
Assert.assertNotEquals(
@ -315,14 +381,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
DEFAULT_VERSION,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode()
);
@ -332,14 +400,16 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
DEFAULT_MAX_ZNODE,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode()
);
@ -350,16 +420,37 @@ public class RemoteTaskRunnerConfigTest
timeout,
version,
max,
timeout
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
DEFAULT_TIMEOUT
DEFAULT_TIMEOUT,
pendingTasksRunnerNumThreads
)).hashCode()
);
Assert.assertNotEquals(
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
pendingTasksRunnerNumThreads
)).hashCode(),
reflect(generateRemoteTaskRunnerConfig(
timeout,
timeout,
version,
max,
timeout,
DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS
)).hashCode()
);
}
private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException
@ -372,7 +463,8 @@ public class RemoteTaskRunnerConfigTest
Period taskCleanupTimeout,
String minWorkerVersion,
long maxZnodeBytes,
Period taskShutdownLinkTimeout
Period taskShutdownLinkTimeout,
int pendingTasksRunnerNumThreads
)
{
final Map<String, Object> objectMap = new HashMap<>();
@ -381,6 +473,7 @@ public class RemoteTaskRunnerConfigTest
objectMap.put("minWorkerVersion", minWorkerVersion);
objectMap.put("maxZnodeBytes", maxZnodeBytes);
objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout);
objectMap.put("pendingTasksRunnerNumThreads", pendingTasksRunnerNumThreads);
return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class);
}
}