From 2c2771b90eb51e90741c191dce2dd59700b83485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 27 Jan 2015 14:17:12 -0800 Subject: [PATCH] Make dynamic worker selection actually work --- .../indexing/overlord/RemoteTaskRunner.java | 17 +++- .../overlord/RemoteTaskRunnerFactory.java | 18 +--- .../SimpleResourceManagementStrategy.java | 2 +- .../overlord/http/OverlordResource.java | 37 ------- .../overlord/setup/WorkerBehaviorConfig.java | 8 ++ .../overlord/setup/WorkerSetupData.java | 97 ------------------- .../overlord/RemoteTaskRunnerTest.java | 5 +- .../overlord/WorkerSetupDataTest.java | 64 ------------ .../main/java/io/druid/cli/CliOverlord.java | 2 - 9 files changed, 30 insertions(+), 220 deletions(-) delete mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java delete mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index bc70d153c1b..d1365847512 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -25,6 +25,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -48,11 +49,11 @@ import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.server.initialization.IndexerZkConfig; -import io.druid.server.initialization.ZkPathsConfig; import io.druid.tasklogs.TaskLogStreamer; import org.apache.commons.lang.mutable.MutableInt; import org.apache.curator.framework.CuratorFramework; @@ -108,7 +109,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; private final HttpClient httpClient; - private final WorkerSelectStrategy strategy; + private final Supplier workerConfigRef; // all workers that exist in ZK private final ConcurrentMap zkWorkers = new ConcurrentHashMap<>(); @@ -134,7 +135,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer CuratorFramework cf, PathChildrenCacheFactory pathChildrenCacheFactory, HttpClient httpClient, - WorkerSelectStrategy strategy + Supplier workerConfigRef ) { this.jsonMapper = jsonMapper; @@ -144,7 +145,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer this.pathChildrenCacheFactory = pathChildrenCacheFactory; this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); this.httpClient = httpClient; - this.strategy = strategy; + this.workerConfigRef = workerConfigRef; } @LifecycleStart @@ -530,6 +531,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return true; } else { // Nothing running this task, announce it in ZK for a worker to run it + WorkerBehaviorConfig workerConfig = workerConfigRef.get(); + WorkerSelectStrategy strategy; + if (workerConfig == null || workerConfig.getSelectStrategy() == null) { + log.warn("No worker selections strategy set. Using default."); + strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY; + } else { + strategy = workerConfig.getSelectStrategy(); + } final Optional immutableZkWorker = strategy.findWorkerForTask( config, ImmutableMap.copyOf( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 5c0e1cca09f..6568c4513b3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -42,7 +42,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory private final IndexerZkConfig zkPaths; private final ObjectMapper jsonMapper; private final HttpClient httpClient; - private final WorkerSelectStrategy strategy; + private final Supplier workerConfigRef; @Inject public RemoteTaskRunnerFactory( @@ -51,7 +51,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory final IndexerZkConfig zkPaths, final ObjectMapper jsonMapper, @Global final HttpClient httpClient, - final Supplier workerBehaviourConfigSupplier + final Supplier workerConfigRef ) { this.curator = curator; @@ -59,17 +59,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory this.zkPaths = zkPaths; this.jsonMapper = jsonMapper; this.httpClient = httpClient; - if (workerBehaviourConfigSupplier != null) { - // Backwards compatibility - final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get(); - if (workerBehaviorConfig != null) { - this.strategy = workerBehaviorConfig.getSelectStrategy(); - } else { - this.strategy = new FillCapacityWorkerSelectStrategy(); - } - } else { - this.strategy = new FillCapacityWorkerSelectStrategy(); - } + this.workerConfigRef = workerConfigRef; } @Override @@ -85,7 +75,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory .withCompressed(true) .build(), httpClient, - strategy + workerConfigRef ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java index 7a8491af27f..4264fe8b638 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/autoscaling/SimpleResourceManagementStrategy.java @@ -77,7 +77,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat synchronized (lock) { boolean didProvision = false; final WorkerBehaviorConfig workerConfig = workerConfigRef.get(); - if (workerConfig == null) { + if (workerConfig == null || workerConfig.getAutoScaler() == null) { log.warn("No workerConfig available, cannot provision new workers."); return false; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 76523504f53..cd0915cef13 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.ByteSource; -import com.google.common.io.InputSupplier; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; @@ -44,7 +43,6 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.metadata.EntryExistsException; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; @@ -61,7 +59,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; -import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.Map; @@ -82,9 +79,6 @@ public class OverlordResource private AtomicReference workerConfigRef = null; - @Deprecated - private AtomicReference workerSetupDataRef = null; - @Inject public OverlordResource( TaskMaster taskMaster, @@ -198,37 +192,6 @@ public class OverlordResource return Response.ok().build(); } - - @Deprecated - @GET - @Path("/worker/setup") - @Produces(MediaType.APPLICATION_JSON) - public Response getWorkerSetupData() - { - if (workerSetupDataRef == null) { - workerSetupDataRef = configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class); - } - - return Response.ok(workerSetupDataRef.get()).build(); - } - - @Deprecated - @POST - @Path("/worker/setup") - @Consumes(MediaType.APPLICATION_JSON) - public Response setWorkerSetupData( - final WorkerSetupData workerSetupData - ) - { - if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) { - return Response.status(Response.Status.BAD_REQUEST).build(); - } - - log.info("Updating Worker Setup configs: %s", workerSetupData); - - return Response.ok().build(); - } - @POST @Path("/action") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java index afaf9171ea8..89926d477eb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerBehaviorConfig.java @@ -22,12 +22,20 @@ package io.druid.indexing.overlord.setup; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.indexing.overlord.autoscaling.AutoScaler; +import io.druid.indexing.overlord.autoscaling.NoopAutoScaler; /** */ public class WorkerBehaviorConfig { public static final String CONFIG_KEY = "worker.config"; + public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy(); + public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler(); + + public static WorkerBehaviorConfig defaultConfig() + { + return new WorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER); + } private final WorkerSelectStrategy selectStrategy; private final AutoScaler autoScaler; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java deleted file mode 100644 index 96e206d0cb3..00000000000 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.indexing.overlord.setup; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData; -import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData; - -/** - */ -@Deprecated -public class WorkerSetupData -{ - public static final String CONFIG_KEY = "worker.setup"; - - private final int minNumWorkers; - private final int maxNumWorkers; - private final String availabilityZone; - private final EC2NodeData nodeData; - private final EC2UserData userData; - - @JsonCreator - public WorkerSetupData( - @JsonProperty("minNumWorkers") int minNumWorkers, - @JsonProperty("maxNumWorkers") int maxNumWorkers, - @JsonProperty("availabilityZone") String availabilityZone, - @JsonProperty("nodeData") EC2NodeData nodeData, - @JsonProperty("userData") EC2UserData userData - ) - { - this.minNumWorkers = minNumWorkers; - this.maxNumWorkers = maxNumWorkers; - this.availabilityZone = availabilityZone; - this.nodeData = nodeData; - this.userData = userData; - } - - @JsonProperty - public int getMinNumWorkers() - { - return minNumWorkers; - } - - @JsonProperty - public int getMaxNumWorkers() - { - return maxNumWorkers; - } - - @JsonProperty - public String getAvailabilityZone() - { - return availabilityZone; - } - - @JsonProperty - public EC2NodeData getNodeData() - { - return nodeData; - } - - @JsonProperty - public EC2UserData getUserData() - { - return userData; - } - - @Override - public String toString() - { - return "WorkerSetupData{" + - ", minNumWorkers=" + minNumWorkers + - ", maxNumWorkers=" + maxNumWorkers + - ", availabilityZone=" + availabilityZone + - ", nodeData=" + nodeData + - ", userData=" + userData + - '}'; - } -} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 63174d9640f..9878f6d5d6b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.indexing.common.IndexingServiceCondition; @@ -40,6 +41,7 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.jackson.DefaultObjectMapper; @@ -59,6 +61,7 @@ import org.junit.Test; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class RemoteTaskRunnerTest { @@ -407,7 +410,7 @@ public class RemoteTaskRunnerTest cf, new SimplePathChildrenCacheFactory.Builder().build(), null, - new FillCapacityWorkerSelectStrategy() + DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())) ); remoteTaskRunner.start(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java deleted file mode 100644 index 8e21266d800..00000000000 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/WorkerSetupDataTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.indexing.overlord; - -import com.google.common.base.Charsets; -import com.metamx.common.StringUtils; -import io.druid.indexing.common.TestUtils; -import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData; -import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData; -import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData; -import org.apache.commons.codec.binary.Base64; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; - -@Deprecated -public class WorkerSetupDataTest -{ - @Test - public void testGalaxyEC2UserDataSerde() throws IOException - { - final String json = "{\"env\":\"druid\",\"version\":null,\"type\":\"typical\"}"; - final GalaxyEC2UserData userData = (GalaxyEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class); - Assert.assertEquals("druid", userData.getEnv()); - Assert.assertEquals("typical", userData.getType()); - Assert.assertNull(userData.getVersion()); - Assert.assertEquals("1234", userData.withVersion("1234").getVersion()); - } - - @Test - public void testStringEC2UserDataSerde() throws IOException - { - final String json = "{\"impl\":\"string\",\"data\":\"hey :ver:\",\"versionReplacementString\":\":ver:\",\"version\":\"1234\"}"; - final StringEC2UserData userData = (StringEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class); - Assert.assertEquals("hey :ver:", userData.getData()); - Assert.assertEquals("1234", userData.getVersion()); - Assert.assertEquals( - Base64.encodeBase64String(StringUtils.toUtf8("hey 1234")), - userData.getUserDataBase64() - ); - Assert.assertEquals( - Base64.encodeBase64String(StringUtils.toUtf8("hey xyz")), - userData.withVersion("xyz").getUserDataBase64() - ); - } -} diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 792fc4155b7..917034283ee 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -69,7 +69,6 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordResource; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.server.http.RedirectFilter; @@ -200,7 +199,6 @@ public class CliOverlord extends ServerRunnable biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); - JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null); }