diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 9c37d4af26f..cf9f2f0f9e4 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1066,6 +1066,7 @@ The following configs only apply if the Overlord is running in remote mode. For |--------|-----------|-------| |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a MiddleManager before throwing an error.|PT5M| |`druid.indexer.runner.minWorkerVersion`|The minimum MiddleManager version to send tasks to. |"0"| +| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range [0, 1]. |1| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect MiddleManagers to compress Znodes.|true| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.| 512 KiB | |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a MiddleManager is disconnected from Zookeeper.|PT15M| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index 2c077581b04..aaea3f453d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.worker.Worker; import org.joda.time.DateTime; @@ -39,6 +40,7 @@ public class ImmutableWorkerInfo { private final Worker worker; private final int currCapacityUsed; + private final int currParallelIndexCapacityUsed; private final ImmutableSet availabilityGroups; private final ImmutableSet runningTasks; private final DateTime lastCompletedTaskTime; @@ -48,6 +50,7 @@ public class ImmutableWorkerInfo public ImmutableWorkerInfo( @JsonProperty("worker") Worker worker, @JsonProperty("currCapacityUsed") int currCapacityUsed, + @JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed, @JsonProperty("availabilityGroups") Set availabilityGroups, @JsonProperty("runningTasks") Collection runningTasks, @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime, @@ -56,12 +59,26 @@ public class ImmutableWorkerInfo { this.worker = worker; this.currCapacityUsed = currCapacityUsed; + this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed; this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); this.runningTasks = ImmutableSet.copyOf(runningTasks); this.lastCompletedTaskTime = lastCompletedTaskTime; this.blacklistedUntil = blacklistedUntil; } + public ImmutableWorkerInfo( + Worker worker, + int currCapacityUsed, + int currParallelIndexCapacityUsed, + Set availabilityGroups, + Collection runningTasks, + DateTime lastCompletedTaskTime + ) + { + this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups, + runningTasks, lastCompletedTaskTime, null); + } + public ImmutableWorkerInfo( Worker worker, int currCapacityUsed, @@ -70,7 +87,7 @@ public class ImmutableWorkerInfo DateTime lastCompletedTaskTime ) { - this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null); + this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null); } @JsonProperty("worker") @@ -85,6 +102,12 @@ public class ImmutableWorkerInfo return currCapacityUsed; } + @JsonProperty("currParallelIndexCapacityUsed") + public int getCurrParallelIndexCapacityUsed() + { + return currParallelIndexCapacityUsed; + } + @JsonProperty("availabilityGroups") public Set getAvailabilityGroups() { @@ -119,12 +142,36 @@ public class ImmutableWorkerInfo return worker.getVersion().compareTo(minVersion) >= 0; } - public boolean canRunTask(Task task) + public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio) { return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() + && canRunParallelIndexTask(task, parallelIndexTaskSlotRatio) && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); } + private boolean canRunParallelIndexTask(Task task, double parallelIndexTaskSlotRatio) + { + if (!task.getType().equals(ParallelIndexSupervisorTask.TYPE)) { + return true; + } + return getWorkerParallelIndexCapacity(parallelIndexTaskSlotRatio) - getCurrParallelIndexCapacityUsed() + >= task.getTaskResource().getRequiredCapacity(); + + } + + private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio) + { + int totalCapacity = worker.getCapacity(); + int workerParallelIndexCapacity = (int) Math.floor(parallelIndexTaskSlotRatio * totalCapacity); + if (workerParallelIndexCapacity < 1) { + workerParallelIndexCapacity = 1; + } + if (workerParallelIndexCapacity > totalCapacity) { + workerParallelIndexCapacity = totalCapacity; + } + return workerParallelIndexCapacity; + } + @Override public boolean equals(Object o) { @@ -140,6 +187,9 @@ public class ImmutableWorkerInfo if (currCapacityUsed != that.currCapacityUsed) { return false; } + if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) { + return false; + } if (!worker.equals(that.worker)) { return false; } @@ -162,6 +212,7 @@ public class ImmutableWorkerInfo { int result = worker.hashCode(); result = 31 * result + currCapacityUsed; + result = 31 * result + currParallelIndexCapacityUsed; result = 31 * result + availabilityGroups.hashCode(); result = 31 * result + runningTasks.hashCode(); result = 31 * result + lastCompletedTaskTime.hashCode(); @@ -175,6 +226,7 @@ public class ImmutableWorkerInfo return "ImmutableWorkerInfo{" + "worker=" + worker + ", currCapacityUsed=" + currCapacityUsed + + ", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed + ", availabilityGroups=" + availabilityGroups + ", runningTasks=" + runningTasks + ", lastCompletedTaskTime=" + lastCompletedTaskTime + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java index 2be16ae57d3..a875090046e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.druid.annotations.UsedInGeneratedCode; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; @@ -110,6 +111,18 @@ public class ZkWorker implements Closeable return currCapacity; } + @JsonProperty("currParallelIndexCapacityUsed") + public int getCurrParallelIndexCapacityUsed() + { + int currParallelIndexCapacityUsed = 0; + for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) { + currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity(); + } + } + return currParallelIndexCapacityUsed; + } + @JsonProperty("availabilityGroups") public Set getAvailabilityGroups() { @@ -168,6 +181,7 @@ public class ZkWorker implements Closeable return new ImmutableWorkerInfo( worker.get(), getCurrCapacityUsed(), + getCurrParallelIndexCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 7d83b3d0e87..1c9ba5f4866 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -31,6 +31,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; @@ -477,9 +478,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task) { + int parallelIndexTaskCapacity = task.getType().equals(ParallelIndexSupervisorTask.TYPE) + ? task.getTaskResource().getRequiredCapacity() + : 0; return new ImmutableWorkerInfo( immutableWorker.getWorker(), immutableWorker.getCurrCapacityUsed() + 1, + immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity, Sets.union( immutableWorker.getAvailabilityGroups(), Sets.newHashSet( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java index c7f13454f06..d916b3ea582 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java @@ -26,8 +26,26 @@ public class WorkerTaskRunnerConfig @JsonProperty private String minWorkerVersion = "0"; + @JsonProperty + private double parallelIndexTaskSlotRatio = 1; + public String getMinWorkerVersion() { return minWorkerVersion; } + + /** + * The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier + * + * A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour) + * A value of 0 means ParallelIndexSupervisorTasks can occupy no slots. + * Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks, + * as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks. + * + * @return ratio of task slots available to a parallel indexing task at a worker level + */ + public double getParallelIndexTaskSlotRatio() + { + return parallelIndexTaskSlotRatio; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index ff9aab1bfe4..0bf4de0df86 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; @@ -149,6 +150,17 @@ public class WorkerHolder return currCapacity; } + private int getCurrParallelIndexCapcityUsed() + { + int currParallelIndexCapacityUsed = 0; + for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) { + if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) { + currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity(); + } + } + return currParallelIndexCapacityUsed; + } + private Set getAvailabilityGroups() { Set retVal = new HashSet<>(); @@ -193,6 +205,7 @@ public class WorkerHolder return new ImmutableWorkerInfo( w, getCurrCapacityUsed(), + getCurrParallelIndexCapcityUsed(), getAvailabilityGroups(), getRunningTasks().keySet(), lastCompletedTaskTime.get(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java index 24721e85d06..c3832daae32 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java @@ -150,7 +150,7 @@ public class WorkerSelectUtils { return allWorkers.values() .stream() - .filter(worker -> worker.canRunTask(task) + .filter(worker -> worker.canRunTask(task, workerTaskRunnerConfig.getParallelIndexTaskSlotRatio()) && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion())) .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java index 785698e68f0..e373eb982a3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java @@ -22,6 +22,10 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.jackson.DefaultObjectMapper; @@ -29,6 +33,9 @@ import org.apache.druid.java.util.common.DateTimes; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ImmutableWorkerInfoTest { @Test @@ -193,6 +200,7 @@ public class ImmutableWorkerInfoTest "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 3, + 0, ImmutableSet.of("grp1", "grp2"), ImmutableSet.of("task1", "task2"), DateTimes.of("2015-01-01T01:01:01Z"), @@ -202,6 +210,7 @@ public class ImmutableWorkerInfoTest "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY ), 2, + 0, ImmutableSet.of("grp1", "grp2"), ImmutableSet.of("task1", "task2"), DateTimes.of("2015-01-01T01:01:02Z"), @@ -209,6 +218,57 @@ public class ImmutableWorkerInfoTest ), false); } + @Test + public void test_canRunTask() + { + ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo( + new Worker("http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY), + 6, + 0, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + DateTimes.of("2015-01-01T01:01:02Z") + ); + + + // Parallel index task + TaskResource taskResource0 = mock(TaskResource.class); + when(taskResource0.getRequiredCapacity()).thenReturn(3); + Task parallelIndexTask = mock(ParallelIndexSupervisorTask.class); + when(parallelIndexTask.getType()).thenReturn(ParallelIndexSupervisorTask.TYPE); + when(parallelIndexTask.getTaskResource()).thenReturn(taskResource0); + + // Since task satisifies parallel and total slot constraints, can run + Assert.assertTrue(workerInfo.canRunTask(parallelIndexTask, 0.5)); + + // Since task fails the parallel slot constraint, it cannot run (3 > 1) + Assert.assertFalse(workerInfo.canRunTask(parallelIndexTask, 0.1)); + + + // Some other indexing task + TaskResource taskResource1 = mock(TaskResource.class); + when(taskResource1.getRequiredCapacity()).thenReturn(5); + Task anyOtherTask = mock(IndexTask.class); + when(anyOtherTask.getType()).thenReturn("index"); + when(anyOtherTask.getTaskResource()).thenReturn(taskResource1); + + // Not a parallel index task -> satisfies parallel index constraint + // But does not satisfy the total slot constraint and cannot run (11 > 10) + Assert.assertFalse(workerInfo.canRunTask(anyOtherTask, 0.5)); + + + // Task has an availability conflict ("grp1") + TaskResource taskResource2 = mock(TaskResource.class); + when(taskResource2.getRequiredCapacity()).thenReturn(1); + when(taskResource2.getAvailabilityGroup()).thenReturn("grp1"); + Task grp1Task = mock(IndexTask.class); + when(grp1Task.getType()).thenReturn("blah"); + when(grp1Task.getTaskResource()).thenReturn(taskResource2); + + // Satisifies parallel index and total index slot constraints but cannot run due availability + Assert.assertFalse(workerInfo.canRunTask(grp1Task, 0.3)); + } + private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch) { if (shouldMatch) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java index 8717edca6bb..c767bf64c7a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java @@ -57,10 +57,11 @@ public class JavaScriptWorkerSelectStrategyTest + "}\n" + "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n" + "var minWorkerVer = config.getMinWorkerVersion();\n" + + "var parallelIndexTaskSlotRatio = config.getParallelIndexTaskSlotRatio();\n" + "for (var i = 0; i < sortedWorkers.length; i++) {\n" + " var worker = sortedWorkers[i];\n" + " var zkWorker = zkWorkers.get(worker);\n" - + " if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {\n" + + " if (zkWorker.canRunTask(task, parallelIndexTaskSlotRatio) && zkWorker.isValidVersion(minWorkerVer)) {\n" + " if (task.getType() == 'index_hadoop' && batch_workers.contains(worker)) {\n" + " return worker;\n" + " } else {\n" @@ -238,8 +239,9 @@ public class JavaScriptWorkerSelectStrategyTest private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion) { ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class); - EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes(); + EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class), EasyMock.anyDouble())).andReturn(canRunTask).anyTimes(); EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes(); + EasyMock.expect(worker.getCurrParallelIndexCapacityUsed()).andReturn(0).anyTimes(); EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes(); EasyMock.replay(worker); return worker;