From 4a09678739b25ad8334f7104671c165f436c1436 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Sep 2014 14:24:02 -0700 Subject: [PATCH 1/8] make the selection strategy in rtr extendable --- .../indexing/overlord/RemoteTaskRunner.java | 52 ++---------- .../overlord/RemoteTaskRunnerFactory.java | 20 ++--- .../FillCapacityWorkerSelectStrategy.java | 80 +++++++++++++++++++ .../overlord/setup/WorkerSelectStrategy.java | 39 +++++++++ .../overlord/RemoteTaskRunnerTest.java | 9 ++- .../main/java/io/druid/cli/CliOverlord.java | 16 ++++ 6 files changed, 159 insertions(+), 57 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 3d8de3c7001..00285e8a539 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -25,14 +25,11 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; -import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -49,7 +46,7 @@ import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.server.initialization.ZkPathsConfig; @@ -70,10 +67,8 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -109,8 +104,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final CuratorFramework cf; private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; - private final Supplier workerSetupData; private final HttpClient httpClient; + private final WorkerSelectStrategy strategy; // all workers that exist in ZK private final ConcurrentMap zkWorkers = new ConcurrentHashMap<>(); @@ -135,8 +130,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ZkPathsConfig zkPaths, CuratorFramework cf, PathChildrenCacheFactory pathChildrenCacheFactory, - Supplier workerSetupData, - HttpClient httpClient + HttpClient httpClient, + WorkerSelectStrategy strategy ) { this.jsonMapper = jsonMapper; @@ -145,8 +140,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.cf = cf; this.pathChildrenCacheFactory = pathChildrenCacheFactory; this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath()); - this.workerSetupData = workerSetupData; this.httpClient = httpClient; + this.strategy = strategy; } @LifecycleStart @@ -524,9 +519,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - ZkWorker zkWorker = findWorkerForTask(task); - if (zkWorker != null) { - announceTask(task, zkWorker, taskRunnerWorkItem); + Optional zkWorker = strategy.findWorkerForTask(zkWorkers, task); + if (zkWorker.isPresent()) { + announceTask(task, zkWorker.get(), taskRunnerWorkItem); return true; } else { return false; @@ -789,37 +784,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } } - private ZkWorker findWorkerForTask(final Task task) - { - TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() - { - @Override - public int compare( - ZkWorker zkWorker, ZkWorker zkWorker2 - ) - { - int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); - if (retVal == 0) { - retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost()); - } - - return retVal; - } - } - ); - sortedWorkers.addAll(zkWorkers.values()); - final String minWorkerVer = config.getMinWorkerVersion(); - - for (ZkWorker zkWorker : sortedWorkers) { - if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { - return zkWorker; - } - } - log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); - return null; - } - private void taskComplete( RemoteTaskRunnerWorkItem taskRunnerWorkItem, ZkWorker zkWorker, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index aa7e1720e89..66306636e7f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -20,26 +20,25 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.guice.annotations.Global; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; /** -*/ + */ public class RemoteTaskRunnerFactory implements TaskRunnerFactory { private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; private final ZkPathsConfig zkPaths; private final ObjectMapper jsonMapper; - private final Supplier setupDataWatch; private final HttpClient httpClient; + private final WorkerSelectStrategy strategy; @Inject public RemoteTaskRunnerFactory( @@ -47,15 +46,16 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory final RemoteTaskRunnerConfig remoteTaskRunnerConfig, final ZkPathsConfig zkPaths, final ObjectMapper jsonMapper, - final Supplier setupDataWatch, - @Global final HttpClient httpClient - ) { + @Global final HttpClient httpClient, + final WorkerSelectStrategy strategy + ) + { this.curator = curator; this.remoteTaskRunnerConfig = remoteTaskRunnerConfig; this.zkPaths = zkPaths; this.jsonMapper = jsonMapper; - this.setupDataWatch = setupDataWatch; this.httpClient = httpClient; + this.strategy = strategy; } @Override @@ -70,8 +70,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory .Builder() .withCompressed(remoteTaskRunnerConfig.isCompressZnodes()) .build(), - setupDataWatch, - httpClient + httpClient, + strategy ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java new file mode 100644 index 00000000000..67fc661f6c0 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -0,0 +1,80 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord.setup; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ZkWorker; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeSet; + +/** + */ +public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy +{ + private static final EmittingLogger log = new EmittingLogger(FillCapacityWorkerSelectStrategy.class); + + private final RemoteTaskRunnerConfig config; + + @Inject + public FillCapacityWorkerSelectStrategy(RemoteTaskRunnerConfig config) + { + this.config = config; + } + + public Optional findWorkerForTask(final Map zkWorkers, final Task task) + { + TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() + { + @Override + public int compare( + ZkWorker zkWorker, ZkWorker zkWorker2 + ) + { + int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); + if (retVal == 0) { + retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost()); + } + + return retVal; + } + } + ); + sortedWorkers.addAll(zkWorkers.values()); + final String minWorkerVer = config.getMinWorkerVersion(); + + for (ZkWorker zkWorker : sortedWorkers) { + if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { + return Optional.of(zkWorker); + } + } + log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); + + return Optional.absent(); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java new file mode 100644 index 00000000000..78e09f4b650 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -0,0 +1,39 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Optional; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ZkWorker; + +import java.util.Map; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class) +}) +public interface WorkerSelectStrategy +{ + public Optional findWorkerForTask(final Map zkWorkers, final Task task); +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 26aa7077c56..11c7c85c639 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -39,6 +39,8 @@ import io.druid.indexing.common.TestRealtimeTask; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; @@ -367,9 +369,10 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { + RemoteTaskRunnerConfig config = new TestRemoteTaskRunnerConfig(); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, - new TestRemoteTaskRunnerConfig(), + config, new ZkPathsConfig() { @Override @@ -380,8 +383,8 @@ public class RemoteTaskRunnerTest }, cf, new SimplePathChildrenCacheFactory.Builder().build(), - DSuppliers.of(new AtomicReference(new WorkerSetupData(0, 1, null, null, null))), - null + null, + new FillCapacityWorkerSelectStrategy(config) ); remoteTaskRunner.start(); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 1fa46738042..9aa89fa11a2 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -70,6 +70,8 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl import io.druid.indexing.overlord.scaling.ResourceManagementStrategy; import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig; import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy; +import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; +import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -196,6 +198,20 @@ public class CliOverlord extends ServerRunnable biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + + PolyBind.createChoice( + binder, + "druid.indexer.runner.workerSelectStrategy.type", + Key.get(WorkerSelectStrategy.class), + Key.get(FillCapacityWorkerSelectStrategy.class) + ); + final MapBinder stratBinder = PolyBind.optionBinder( + binder, + Key.get(WorkerSelectStrategy.class) + ); + + stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class); + binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class); } private void configureAutoscale(Binder binder) From 55db06ccb108c7e57bab0d6d516862a34d818ad3 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 10:29:02 -0700 Subject: [PATCH 2/8] address cr --- .../indexing/overlord/ImmutableZkWorker.java | 78 +++++++++++++++++++ .../indexing/overlord/RemoteTaskRunner.java | 28 ++++++- .../io/druid/indexing/overlord/ZkWorker.java | 5 ++ .../FillCapacityWorkerSelectStrategy.java | 18 +++-- .../overlord/setup/WorkerSelectStrategy.java | 16 +++- 5 files changed, 133 insertions(+), 12 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java new file mode 100644 index 00000000000..893ca3d9a72 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java @@ -0,0 +1,78 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord; + +import com.google.common.collect.ImmutableSet; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.worker.Worker; + +import java.util.Set; + +/** + * A snapshot of a {@link io.druid.indexing.overlord.ZkWorker} + */ +public class ImmutableZkWorker +{ + private final ZkWorker mutableZkWorker; + private final Worker worker; + private final int currCapacityUsed; + private final Set availabilityGroups; + + public ImmutableZkWorker( + ZkWorker mutableZkWorker + ) + { + this.mutableZkWorker = mutableZkWorker; + this.worker = mutableZkWorker.getWorker(); + this.currCapacityUsed = mutableZkWorker.getCurrCapacityUsed(); + this.availabilityGroups = ImmutableSet.copyOf(mutableZkWorker.getAvailabilityGroups()); + } + + public ZkWorker getMutableZkWorker() + { + return mutableZkWorker; + } + + public Worker getWorker() + { + return worker; + } + + public int getCurrCapacityUsed() + { + return currCapacityUsed; + } + + public Set getAvailabilityGroups() + { + return availabilityGroups; + } + + public boolean isValidVersion(String minVersion) + { + return worker.getVersion().compareTo(minVersion) >= 0; + } + + public boolean canRunTask(Task task) + { + return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() + && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); + } +} \ No newline at end of file diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 00285e8a539..df14f4f1ac0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -27,8 +27,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.InputSupplier; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -62,6 +64,7 @@ import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -83,13 +86,13 @@ import java.util.concurrent.TimeUnit; * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running. * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup. * The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK. - * + *

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * fail. The RemoteTaskRunner depends on another component to create additional worker resources. * For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties. - * + *

* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker. - * + *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer @@ -519,7 +522,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - Optional zkWorker = strategy.findWorkerForTask(zkWorkers, task); + Optional zkWorker = strategy.findWorkerForTask( + ImmutableMap.copyOf( + Maps.transformEntries( + zkWorkers, + new Maps.EntryTransformer() + { + @Override + public ImmutableZkWorker transformEntry( + String key, ZkWorker value + ) + { + return value.toImmutable(); + } + } + ) + ), + task + ); if (zkWorker.isPresent()) { announceTask(task, zkWorker.get(), taskRunnerWorkItem); return true; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index 335b5fa583d..c5bdc254193 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -158,6 +158,11 @@ public class ZkWorker implements Closeable lastCompletedTaskTime.getAndSet(completedTaskTime); } + public ImmutableZkWorker toImmutable() + { + return new ImmutableZkWorker(this); + } + @Override public void close() throws IOException { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 67fc661f6c0..a72da100764 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -20,16 +20,17 @@ package io.druid.indexing.overlord.setup; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import java.util.Comparator; -import java.util.Map; import java.util.TreeSet; /** @@ -46,14 +47,17 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy this.config = config; } - public Optional findWorkerForTask(final Map zkWorkers, final Task task) + public Optional findWorkerForTask( + final ImmutableMap zkWorkers, + final Task task + ) { - TreeSet sortedWorkers = Sets.newTreeSet( - new Comparator() + TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() { @Override public int compare( - ZkWorker zkWorker, ZkWorker zkWorker2 + ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2 ) { int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); @@ -68,9 +72,9 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy sortedWorkers.addAll(zkWorkers.values()); final String minWorkerVer = config.getMinWorkerVersion(); - for (ZkWorker zkWorker : sortedWorkers) { + for (ImmutableZkWorker zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { - return Optional.of(zkWorker); + return Optional.of(zkWorker.getMutableZkWorker()); } } log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 78e09f4b650..6a627cb25f6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -22,12 +22,15 @@ package io.druid.indexing.overlord.setup; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; import io.druid.indexing.overlord.ZkWorker; import java.util.Map; /** + * The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @@ -35,5 +38,16 @@ import java.util.Map; }) public interface WorkerSelectStrategy { - public Optional findWorkerForTask(final Map zkWorkers, final Task task); + /** + * Customizable logic for selecting a worker to run a task. + * + * @param zkWorkers An immutable map of workers to choose from. + * @param task The task to assign. + * + * @return A {@link io.druid.indexing.overlord.ZkWorker} to run the task if one is available. + */ + public Optional findWorkerForTask( + final ImmutableMap zkWorkers, + final Task task + ); } From 4c23a5e9f655cf32b22048c96e49c94098d1f345 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:40:29 -0700 Subject: [PATCH 3/8] address cr again --- .../indexing/overlord/ImmutableZkWorker.java | 17 ++++------------- .../indexing/overlord/RemoteTaskRunner.java | 8 ++++---- .../overlord/setup/WorkerSelectStrategy.java | 4 ++-- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java index 893ca3d9a72..51d05d222d0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java @@ -30,24 +30,15 @@ import java.util.Set; */ public class ImmutableZkWorker { - private final ZkWorker mutableZkWorker; private final Worker worker; private final int currCapacityUsed; private final Set availabilityGroups; - public ImmutableZkWorker( - ZkWorker mutableZkWorker - ) + public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set availabilityGroups) { - this.mutableZkWorker = mutableZkWorker; - this.worker = mutableZkWorker.getWorker(); - this.currCapacityUsed = mutableZkWorker.getCurrCapacityUsed(); - this.availabilityGroups = ImmutableSet.copyOf(mutableZkWorker.getAvailabilityGroups()); - } - - public ZkWorker getMutableZkWorker() - { - return mutableZkWorker; + this.worker = worker; + this.currCapacityUsed = currCapacityUsed; + this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups); } public Worker getWorker() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index df14f4f1ac0..7dc1322a366 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -64,7 +64,6 @@ import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -522,7 +521,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it - Optional zkWorker = strategy.findWorkerForTask( + final Optional immutableZkWorker = strategy.findWorkerForTask( ImmutableMap.copyOf( Maps.transformEntries( zkWorkers, @@ -540,8 +539,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer ), task ); - if (zkWorker.isPresent()) { - announceTask(task, zkWorker.get(), taskRunnerWorkItem); + if (immutableZkWorker.isPresent()) { + final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost()); + announceTask(task, zkWorker, taskRunnerWorkItem); return true; } else { return false; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 6a627cb25f6..80103b6b721 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -44,9 +44,9 @@ public interface WorkerSelectStrategy * @param zkWorkers An immutable map of workers to choose from. * @param task The task to assign. * - * @return A {@link io.druid.indexing.overlord.ZkWorker} to run the task if one is available. + * @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available. */ - public Optional findWorkerForTask( + public Optional findWorkerForTask( final ImmutableMap zkWorkers, final Task task ); From 575d51b0ce4182cb6712001e121358c103dfd26c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:44:50 -0700 Subject: [PATCH 4/8] fix compilation error --- .../overlord/setup/FillCapacityWorkerSelectStrategy.java | 2 +- .../src/main/java/io/druid/indexing/worker/Worker.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index a72da100764..7458030d9fc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -47,7 +47,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy this.config = config; } - public Optional findWorkerForTask( + public Optional findWorkerForTask( final ImmutableMap zkWorkers, final Task task ) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java index ac41aa7292a..c706122e649 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java @@ -76,7 +76,7 @@ public class Worker return "Worker{" + "host='" + host + '\'' + ", ip='" + ip + '\'' + - ", capacity=" + capacity + + ", capacity=" + capacity + ", version='" + version + '\'' + '}'; } From b1b9e0a26774444b50697f400af7fc648be08780 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:45:19 -0700 Subject: [PATCH 5/8] i suck --- .../overlord/setup/FillCapacityWorkerSelectStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 7458030d9fc..59476b46e6d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -74,7 +74,7 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy for (ImmutableZkWorker zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { - return Optional.of(zkWorker.getMutableZkWorker()); + return Optional.of(zkWorker); } } log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); From 2b2b028e5c774b1912ba59ee913252614d54b49c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:53:18 -0700 Subject: [PATCH 6/8] why am i so bad at coding --- .../java/io/druid/indexing/overlord/RemoteTaskRunner.java | 1 + .../src/main/java/io/druid/indexing/overlord/ZkWorker.java | 2 +- .../overlord/setup/FillCapacityWorkerSelectStrategy.java | 5 ----- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 7dc1322a366..e9bba479c64 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -544,6 +544,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer announceTask(task, zkWorker, taskRunnerWorkItem); return true; } else { + log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return false; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java index c5bdc254193..abc4da0ad57 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java @@ -160,7 +160,7 @@ public class ZkWorker implements Closeable public ImmutableZkWorker toImmutable() { - return new ImmutableZkWorker(this); + return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups()); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index 59476b46e6d..db305136d43 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -24,10 +24,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.google.inject.Inject; -import com.metamx.emitter.EmittingLogger; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.ImmutableZkWorker; -import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import java.util.Comparator; @@ -37,8 +35,6 @@ import java.util.TreeSet; */ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy { - private static final EmittingLogger log = new EmittingLogger(FillCapacityWorkerSelectStrategy.class); - private final RemoteTaskRunnerConfig config; @Inject @@ -77,7 +73,6 @@ public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy return Optional.of(zkWorker); } } - log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return Optional.absent(); } From 06757034f2d55738b05216a39143fdc5850c17f3 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 11:54:29 -0700 Subject: [PATCH 7/8] add default impl --- .../io/druid/indexing/overlord/setup/WorkerSelectStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 80103b6b721..46e400e994e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -32,7 +32,7 @@ import java.util.Map; /** * The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class) }) From fab7caafffa8f15da53c5e1252d2fb501c44a27f Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 12:32:14 -0700 Subject: [PATCH 8/8] final code reviews --- .../main/java/io/druid/indexing/overlord/ImmutableZkWorker.java | 2 +- .../src/main/java/io/druid/indexing/worker/Worker.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java index 51d05d222d0..f657b6a2435 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java @@ -66,4 +66,4 @@ public class ImmutableZkWorker return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity() && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); } -} \ No newline at end of file +} diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java index c706122e649..ac41aa7292a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/Worker.java @@ -76,7 +76,7 @@ public class Worker return "Worker{" + "host='" + host + '\'' + ", ip='" + ip + '\'' + - ", capacity=" + capacity + + ", capacity=" + capacity + ", version='" + version + '\'' + '}'; }