diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java new file mode 100644 index 00000000000..f657b6a2435 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java @@ -0,0 +1,69 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord; + +import com.google.common.collect.ImmutableSet; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.worker.Worker; + +import java.util.Set; + +/** + * A snapshot of a {@link io.druid.indexing.overlord.ZkWorker} + */ +public class ImmutableZkWorker +{ + private final Worker worker; + private final int currCapacityUsed; + private final Set availabilityGroups; + + public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set availabilityGroups) + { + this.worker = worker; + this.currCapacityUsed = currCapacityUsed; + this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); + } + + public Worker getWorker() + { + return worker; + } + + public int getCurrCapacityUsed() + { + return currCapacityUsed; + } + + public Set getAvailabilityGroups() + { + return availabilityGroups; + } + + public boolean isValidVersion(String minVersion) + { + return worker.getVersion().compareTo(minVersion) >= 0; + } + + public boolean canRunTask(Task task) + { + return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() + && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 3d8de3c7001..e9bba479c64 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -25,14 +25,13 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.Maps; import com.google.common.io.InputSupplier; -import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -49,7 +48,7 @@ import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.server.initialization.ZkPathsConfig; @@ -70,10 +69,8 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -88,13 +85,13 @@ import java.util.concurrent.TimeUnit; * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running. * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup. * The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK. - * + *

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * fail. The RemoteTaskRunner depends on another component to create additional worker resources. * For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties. - * + *

* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker. - * + *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer @@ -109,8 +106,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final CuratorFramework cf; private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; - private final Supplier workerSetupData; private final HttpClient httpClient; + private final WorkerSelectStrategy strategy; // all workers that exist in ZK private final ConcurrentMap zkWorkers = new ConcurrentHashMap<>(); @@ -135,8 +132,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ZkPathsConfig zkPaths, CuratorFramework cf, PathChildrenCacheFactory pathChildrenCacheFactory, - Supplier workerSetupData, - HttpClient httpClient + HttpClient httpClient, + WorkerSelectStrategy strategy ) { this.jsonMapper = jsonMapper; @@ -145,8 +142,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.cf = cf; this.pathChildrenCacheFactory = pathChildrenCacheFactory; this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath()); - this.workerSetupData = workerSetupData; this.httpClient = httpClient; + this.strategy = strategy; } @LifecycleStart @@ -524,11 +521,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - ZkWorker zkWorker = findWorkerForTask(task); - if (zkWorker != null) { + final Optional immutableZkWorker = strategy.findWorkerForTask( + ImmutableMap.copyOf( + Maps.transformEntries( + zkWorkers, + new Maps.EntryTransformer() + { + @Override + public ImmutableZkWorker transformEntry( + String key, ZkWorker value + ) + { + return value.toImmutable(); + } + } + ) + ), + task + ); + if (immutableZkWorker.isPresent()) { + final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); announceTask(task, zkWorker, taskRunnerWorkItem); return true; } else { + log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; } } @@ -789,37 +805,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } } - private ZkWorker findWorkerForTask(final Task task) - { - TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() - { - @Override - public int compare( - ZkWorker zkWorker, ZkWorker zkWorker2 - ) - { - int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); - if (retVal == 0) { - retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost()); - } - - return retVal; - } - } - ); - sortedWorkers.addAll(zkWorkers.values()); - final String minWorkerVer = config.getMinWorkerVersion(); - - for (ZkWorker zkWorker : sortedWorkers) { - if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { - return zkWorker; - } - } - log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); - return null; - } - private void taskComplete( RemoteTaskRunnerWorkItem taskRunnerWorkItem, ZkWorker zkWorker, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index aa7e1720e89..66306636e7f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -20,26 +20,25 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.guice.annotations.Global; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; /** -*/ + */ public class RemoteTaskRunnerFactory implements TaskRunnerFactory { private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final ZkPathsConfig zkPaths; private final ObjectMapper jsonMapper; - private final Supplier setupDataWatch; private final HttpClient httpClient; + private final WorkerSelectStrategy strategy; @Inject public RemoteTaskRunnerFactory( @@ -47,15 +46,16 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory final RemoteTaskRunnerConfig remoteTaskRunnerConfig, final ZkPathsConfig zkPaths, final ObjectMapper jsonMapper, - final Supplier setupDataWatch, - @Global final HttpClient httpClient - ) { + @Global final HttpClient httpClient, + final WorkerSelectStrategy strategy + ) + { this.curator = curator; this.remoteTaskRunnerConfig = remoteTaskRunnerConfig; this.zkPaths = zkPaths; this.jsonMapper = jsonMapper; - this.setupDataWatch = setupDataWatch; this.httpClient = httpClient; + this.strategy = strategy; } @Override @@ -70,8 +70,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory .Builder() .withCompressed(remoteTaskRunnerConfig.isCompressZnodes()) .build(), - setupDataWatch, - httpClient + httpClient, + strategy ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index 335b5fa583d..abc4da0ad57 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -158,6 +158,11 @@ public class ZkWorker implements Closeable lastCompletedTaskTime.getAndSet(completedTaskTime); } + public ImmutableZkWorker toImmutable() + { + return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups()); + } + @Override public void close() throws IOException { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java new file mode 100644 index 00000000000..db305136d43 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -0,0 +1,79 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord.setup; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.google.inject.Inject; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; + +import java.util.Comparator; +import java.util.TreeSet; + +/** + */ +public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy +{ + private final RemoteTaskRunnerConfig config; + + @Inject + public FillCapacityWorkerSelectStrategy(RemoteTaskRunnerConfig config) + { + this.config = config; + } + + public Optional findWorkerForTask( + final ImmutableMap zkWorkers, + final Task task + ) + { + TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() + { + @Override + public int compare( + ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2 + ) + { + int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); + if (retVal == 0) { + retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost()); + } + + return retVal; + } + } + ); + sortedWorkers.addAll(zkWorkers.values()); + final String minWorkerVer = config.getMinWorkerVersion(); + + for (ImmutableZkWorker zkWorker : sortedWorkers) { + if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { + return Optional.of(zkWorker); + } + } + + return Optional.absent(); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java new file mode 100644 index 00000000000..46e400e994e --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.ZkWorker; + +import java.util.Map; + +/** + * The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class) +}) +public interface WorkerSelectStrategy +{ + /** + * Customizable logic for selecting a worker to run a task. + * + * @param zkWorkers An immutable map of workers to choose from. + * @param task The task to assign. + * + * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. + */ + public Optional findWorkerForTask( + final ImmutableMap zkWorkers, + final Task task + ); +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 26aa7077c56..11c7c85c639 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -39,6 +39,8 @@ import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; @@ -367,9 +369,10 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { + RemoteTaskRunnerConfig config = new TestRemoteTaskRunnerConfig(); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, - new TestRemoteTaskRunnerConfig(), + config, new ZkPathsConfig() { @Override @@ -380,8 +383,8 @@ public class RemoteTaskRunnerTest }, cf, new SimplePathChildrenCacheFactory.Builder().build(), - DSuppliers.of(new AtomicReference(new WorkerSetupData(0, 1, null, null, null))), - null + null, + new FillCapacityWorkerSelectStrategy(config) ); remoteTaskRunner.start(); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 1fa46738042..9aa89fa11a2 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -70,6 +70,8 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl import io.druid.indexing.overlord.scaling.ResourceManagementStrategy; import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig; import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy; +import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -196,6 +198,20 @@ public class CliOverlord extends ServerRunnable biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + + PolyBind.createChoice( + binder, + "druid.indexer.runner.workerSelectStrategy.type", + Key.get(WorkerSelectStrategy.class), + Key.get(FillCapacityWorkerSelectStrategy.class) + ); + final MapBinder stratBinder = PolyBind.optionBinder( + binder, + Key.get(WorkerSelectStrategy.class) + ); + + stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class); + binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class); } private void configureAutoscale(Binder binder)