From bc156effe7ac917dd7d9046d007c757e170fa946 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 22 Feb 2016 15:42:15 -0600 Subject: [PATCH] RTR has multiple threads for assignment of pending tasks now. --- .../content/configuration/indexing-service.md | 1 + .../indexing/overlord/RemoteTaskRunner.java | 121 ++++++++----- .../config/RemoteTaskRunnerConfig.java | 47 ++++- .../config/RemoteTaskRunnerConfigTest.java | 171 ++++++++++++++---- 4 files changed, 248 insertions(+), 92 deletions(-) diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 10e49c2764b..1cc758fb400 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -82,6 +82,7 @@ The following configs only apply if the overlord is running in remote mode: |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M| |`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M| +|`druid.indexer.runner.pendingTasksRunnerNumThreads`|Number of threads to allocate pending-tasks to workers, must be at least 1.|3| There are additional configs for autoscaling (if it is enabled): diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 9dcdbe2c56e..209ffac3c26 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -54,6 +54,7 @@ import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.concurrent.Execs; import io.druid.curator.CuratorUtils; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskLocation; @@ -96,7 +97,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -143,7 +143,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // tasks that are complete but not cleaned up yet private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue(); - private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor(); + private final ExecutorService runPendingTasksExec; // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy. private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>(); @@ -151,6 +151,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // task runner listeners private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + // workers which were assigned a task and are yet to acknowledge same. + // Map: workerId -> taskId + private final ConcurrentMap workersWithUnacknowledgedTask = new ConcurrentHashMap<>(); + // Map: taskId -> taskId .tasks which are being tried to be assigned to a worker + private final ConcurrentMap tryAssignTasks = new ConcurrentHashMap<>(); + private final Object statusLock = new Object(); private volatile boolean started = false; @@ -183,6 +189,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer this.workerConfigRef = workerConfigRef; this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); this.resourceManagement = resourceManagement; + this.runPendingTasksExec = Execs.multiThreaded( + config.getPendingTasksRunnerNumThreads(), + "rtr-pending-tasks-runner-%d" + ); } @Override @@ -320,6 +330,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer zkWorker.close(); } workerPathCache.close(); + + if (runPendingTasksExec != null) { + runPendingTasksExec.shutdown(); + } } catch (Exception e) { throw Throwables.propagate(e); @@ -548,7 +562,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } /** - * This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that + * This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that * are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe. * This method should be run each time there is new worker capacity or if new tasks are assigned. */ @@ -566,17 +580,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer List copy = Lists.newArrayList(pendingTasks.values()); for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) { String taskId = taskRunnerWorkItem.getTaskId(); - try { - if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) { - pendingTaskPayloads.remove(taskId); + if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) { + try { + if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) { + pendingTaskPayloads.remove(taskId); + } + } + catch (Exception e) { + log.makeAlert(e, "Exception while trying to assign task") + .addData("taskId", taskRunnerWorkItem.getTaskId()) + .emit(); + RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId); + taskComplete(workItem, null, TaskStatus.failure(taskId)); + } + finally { + tryAssignTasks.remove(taskId); } - } - catch (Exception e) { - log.makeAlert(e, "Exception while trying to assign task") - .addData("taskId", taskRunnerWorkItem.getTaskId()) - .emit(); - RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId); - taskComplete(workItem, null, TaskStatus.failure(taskId)); } } } @@ -650,40 +669,56 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } else { strategy = workerConfig.getSelectStrategy(); } - final Optional immutableZkWorker = strategy.findWorkerForTask( - config, - ImmutableMap.copyOf( - Maps.transformEntries( - Maps.filterEntries( - zkWorkers, new Predicate>() - { - @Override - public boolean apply(Map.Entry input) + + ZkWorker assignedWorker = null; + try { + final Optional immutableZkWorker = strategy.findWorkerForTask( + config, + ImmutableMap.copyOf( + Maps.transformEntries( + Maps.filterEntries( + zkWorkers, new Predicate>() { - return !lazyWorkers.containsKey(input.getKey()); + @Override + public boolean apply(Map.Entry input) + { + return !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()); + } } - } - ), - new Maps.EntryTransformer() - { - @Override - public ImmutableZkWorker transformEntry( - String key, ZkWorker value - ) + ), + new Maps.EntryTransformer() { - return value.toImmutable(); + @Override + public ImmutableZkWorker transformEntry( + String key, ZkWorker value + ) + { + return value.toImmutable(); + } } - } - ) - ), - task - ); - if (immutableZkWorker.isPresent()) { - final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); - return announceTask(task, zkWorker, taskRunnerWorkItem); - } else { + ) + ), + task + ); + + if (immutableZkWorker.isPresent() + && + workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId()) + == null) { + assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); + return announceTask(task, assignedWorker, taskRunnerWorkItem); + } + log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; + } finally { + if (assignedWorker != null) { + workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost()); + // note that this is essential as a task might not get a worker because a worker was assigned another task. + // so this will ensure that other pending tasks are tried for assignment again. + runPendingTasks(); + } } } } @@ -741,8 +776,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer statusLock.wait(waitMs); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); if (elapsed >= waitMs) { - log.error( - "Something went wrong! [%s] never ran task [%s]! Timeout: (%s >= %s)!", + log.makeAlert( + "Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!", worker, task.getId(), elapsed, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index fce9c8fbf7e..f6971a5d187 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -48,6 +48,10 @@ public class RemoteTaskRunnerConfig @JsonProperty private Period taskShutdownLinkTimeout = new Period("PT1M"); + @JsonProperty + @Min(1) + private int pendingTasksRunnerNumThreads = 3; + public Period getTaskAssignmentTimeout() { return taskAssignmentTimeout; @@ -73,6 +77,12 @@ public class RemoteTaskRunnerConfig return taskShutdownLinkTimeout; } + + public int getPendingTasksRunnerNumThreads() + { + return pendingTasksRunnerNumThreads; + } + @Override public boolean equals(Object o) { @@ -85,30 +95,47 @@ public class RemoteTaskRunnerConfig RemoteTaskRunnerConfig that = (RemoteTaskRunnerConfig) o; - if (getMaxZnodeBytes() != that.getMaxZnodeBytes()) { + if (maxZnodeBytes != that.maxZnodeBytes) { return false; } - if (!getTaskAssignmentTimeout().equals(that.getTaskAssignmentTimeout())) { + if (pendingTasksRunnerNumThreads != that.pendingTasksRunnerNumThreads) { return false; } - if (!getTaskCleanupTimeout().equals(that.getTaskCleanupTimeout())) { + if (!taskAssignmentTimeout.equals(that.taskAssignmentTimeout)) { return false; } - if (!getMinWorkerVersion().equals(that.getMinWorkerVersion())) { + if (!taskCleanupTimeout.equals(that.taskCleanupTimeout)) { return false; } - return getTaskShutdownLinkTimeout().equals(that.getTaskShutdownLinkTimeout()); + if (!minWorkerVersion.equals(that.minWorkerVersion)) { + return false; + } + return taskShutdownLinkTimeout.equals(that.taskShutdownLinkTimeout); } @Override public int hashCode() { - int result = getTaskAssignmentTimeout().hashCode(); - result = 31 * result + getTaskCleanupTimeout().hashCode(); - result = 31 * result + getMinWorkerVersion().hashCode(); - result = 31 * result + getMaxZnodeBytes(); - result = 31 * result + getTaskShutdownLinkTimeout().hashCode(); + int result = taskAssignmentTimeout.hashCode(); + result = 31 * result + taskCleanupTimeout.hashCode(); + result = 31 * result + minWorkerVersion.hashCode(); + result = 31 * result + maxZnodeBytes; + result = 31 * result + taskShutdownLinkTimeout.hashCode(); + result = 31 * result + pendingTasksRunnerNumThreads; return result; } + + @Override + public String toString() + { + return "RemoteTaskRunnerConfig{" + + "taskAssignmentTimeout=" + taskAssignmentTimeout + + ", taskCleanupTimeout=" + taskCleanupTimeout + + ", minWorkerVersion='" + minWorkerVersion + '\'' + + ", maxZnodeBytes=" + maxZnodeBytes + + ", taskShutdownLinkTimeout=" + taskShutdownLinkTimeout + + ", pendingTasksRunnerNumThreads=" + pendingTasksRunnerNumThreads + + '}'; + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java index 24927d25e37..f533ab56e41 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java @@ -35,6 +35,7 @@ public class RemoteTaskRunnerConfigTest private static final Period DEFAULT_TIMEOUT = Period.ZERO; private static final String DEFAULT_VERSION = ""; private static final long DEFAULT_MAX_ZNODE = 10 * 1024; + private static final int DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS = 5; @Test public void testGetTaskAssignmentTimeout() throws Exception @@ -47,24 +48,26 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).getTaskAssignmentTimeout() ); } @Test - public void testGetTaskCleanupTimeout() throws Exception + public void testGetPendingTasksRunnerNumThreads() throws Exception { - final Period timeout = Period.hours(1); + final int pendingTasksRunnerNumThreads = 20; Assert.assertEquals( - timeout, + pendingTasksRunnerNumThreads, reflect(generateRemoteTaskRunnerConfig( DEFAULT_TIMEOUT, - timeout, + DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT - )).getTaskCleanupTimeout() + DEFAULT_TIMEOUT, + pendingTasksRunnerNumThreads + )).getPendingTasksRunnerNumThreads() ); } @@ -79,7 +82,8 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, version, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).getMinWorkerVersion() ); } @@ -95,7 +99,8 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, DEFAULT_VERSION, max, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).getMaxZnodeBytes() ); } @@ -111,11 +116,29 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - timeout + timeout, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).getTaskShutdownLinkTimeout() ); } + @Test + public void testGetTaskCleanupTimeout() throws Exception + { + final Period timeout = Period.hours(1); + Assert.assertEquals( + timeout, + reflect(generateRemoteTaskRunnerConfig( + DEFAULT_TIMEOUT, + timeout, + DEFAULT_VERSION, + DEFAULT_MAX_ZNODE, + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS + )).getTaskCleanupTimeout() + ); + } + @Test public void testEquals() throws Exception { @@ -125,33 +148,38 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )), reflect(generateRemoteTaskRunnerConfig( DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )) ); final Period timeout = Period.years(999); final String version = "someVersion"; final long max = 20 * 1024; + final int pendingTasksRunnerNumThreads = 20; Assert.assertEquals( reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )) ); Assert.assertNotEquals( @@ -160,14 +188,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( DEFAULT_TIMEOUT, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )) ); Assert.assertNotEquals( @@ -176,14 +206,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( timeout, DEFAULT_TIMEOUT, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )) ); Assert.assertNotEquals( @@ -192,14 +224,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, DEFAULT_VERSION, max, - timeout + timeout, + pendingTasksRunnerNumThreads )) ); @@ -209,14 +243,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, DEFAULT_MAX_ZNODE, - timeout + timeout, + pendingTasksRunnerNumThreads )) ); @@ -227,16 +263,37 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + pendingTasksRunnerNumThreads )) ); + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout, + pendingTasksRunnerNumThreads + )), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS + )) + ); } @Test @@ -248,33 +305,38 @@ public class RemoteTaskRunnerConfigTest DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).hashCode(), reflect(generateRemoteTaskRunnerConfig( DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_VERSION, DEFAULT_MAX_ZNODE, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS )).hashCode() ); final Period timeout = Period.years(999); final String version = "someVersion"; final long max = 20 * 1024; + final int pendingTasksRunnerNumThreads = 20; Assert.assertEquals( reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode() ); Assert.assertNotEquals( @@ -283,14 +345,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( DEFAULT_TIMEOUT, timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode() ); Assert.assertNotEquals( @@ -299,14 +363,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( timeout, DEFAULT_TIMEOUT, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode() ); Assert.assertNotEquals( @@ -315,14 +381,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, DEFAULT_VERSION, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode() ); @@ -332,14 +400,16 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, DEFAULT_MAX_ZNODE, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode() ); @@ -350,16 +420,37 @@ public class RemoteTaskRunnerConfigTest timeout, version, max, - timeout + timeout, + pendingTasksRunnerNumThreads )).hashCode(), reflect(generateRemoteTaskRunnerConfig( timeout, timeout, version, max, - DEFAULT_TIMEOUT + DEFAULT_TIMEOUT, + pendingTasksRunnerNumThreads )).hashCode() ); + + Assert.assertNotEquals( + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout, + pendingTasksRunnerNumThreads + )).hashCode(), + reflect(generateRemoteTaskRunnerConfig( + timeout, + timeout, + version, + max, + timeout, + DEFAULT_PENDING_TASKS_RUNNER_NUM_THREADS + )).hashCode() + ); } private RemoteTaskRunnerConfig reflect(RemoteTaskRunnerConfig config) throws IOException @@ -372,7 +463,8 @@ public class RemoteTaskRunnerConfigTest Period taskCleanupTimeout, String minWorkerVersion, long maxZnodeBytes, - Period taskShutdownLinkTimeout + Period taskShutdownLinkTimeout, + int pendingTasksRunnerNumThreads ) { final Map objectMap = new HashMap<>(); @@ -381,6 +473,7 @@ public class RemoteTaskRunnerConfigTest objectMap.put("minWorkerVersion", minWorkerVersion); objectMap.put("maxZnodeBytes", maxZnodeBytes); objectMap.put("taskShutdownLinkTimeout", taskShutdownLinkTimeout); + objectMap.put("pendingTasksRunnerNumThreads", pendingTasksRunnerNumThreads); return mapper.convertValue(objectMap, RemoteTaskRunnerConfig.class); } }