Merge pull request #1067 from druid-io/fix-worker-select

Make dynamic worker selection actually work as intended
This commit is contained in:
Xavier Léauté 2015-01-27 14:54:16 -08:00
commit 05b2bf3fdd
9 changed files with 30 additions and 220 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -48,11 +49,11 @@ import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
@ -108,7 +109,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
// all workers that exist in ZK
private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<>();
@ -134,7 +135,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
WorkerSelectStrategy strategy
Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.jsonMapper = jsonMapper;
@ -144,7 +145,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient;
this.strategy = strategy;
this.workerConfigRef = workerConfigRef;
}
@LifecycleStart
@ -530,6 +531,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return true;
} else {
// Nothing running this task, announce it in ZK for a worker to run it
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
WorkerSelectStrategy strategy;
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
log.warn("No worker selections strategy set. Using default.");
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
} else {
strategy = workerConfig.getSelectStrategy();
}
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(

View File

@ -42,7 +42,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final IndexerZkConfig zkPaths;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
@Inject
public RemoteTaskRunnerFactory(
@ -51,7 +51,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final IndexerZkConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
final Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.curator = curator;
@ -59,17 +59,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
if (workerBehaviourConfigSupplier != null) {
// Backwards compatibility
final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviorConfig != null) {
this.strategy = workerBehaviorConfig.getSelectStrategy();
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
this.workerConfigRef = workerConfigRef;
}
@Override
@ -85,7 +75,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.withCompressed(true)
.build(),
httpClient,
strategy
workerConfigRef
);
}
}

View File

@ -77,7 +77,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -44,7 +43,6 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
@ -61,7 +59,6 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -82,9 +79,6 @@ public class OverlordResource
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
@Deprecated
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@Inject
public OverlordResource(
TaskMaster taskMaster,
@ -198,37 +192,6 @@ public class OverlordResource
return Response.ok().build();
}
@Deprecated
@GET
@Path("/worker/setup")
@Produces(MediaType.APPLICATION_JSON)
public Response getWorkerSetupData()
{
if (workerSetupDataRef == null) {
workerSetupDataRef = configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class);
}
return Response.ok(workerSetupDataRef.get()).build();
}
@Deprecated
@POST
@Path("/worker/setup")
@Consumes(MediaType.APPLICATION_JSON)
public Response setWorkerSetupData(
final WorkerSetupData workerSetupData
)
{
if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
log.info("Updating Worker Setup configs: %s", workerSetupData);
return Response.ok().build();
}
@POST
@Path("/action")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -22,12 +22,20 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
/**
*/
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy();
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
public static WorkerBehaviorConfig defaultConfig()
{
return new WorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER);
}
private final WorkerSelectStrategy selectStrategy;
private final AutoScaler autoScaler;

View File

@ -1,97 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
/**
*/
@Deprecated
public class WorkerSetupData
{
public static final String CONFIG_KEY = "worker.setup";
private final int minNumWorkers;
private final int maxNumWorkers;
private final String availabilityZone;
private final EC2NodeData nodeData;
private final EC2UserData userData;
@JsonCreator
public WorkerSetupData(
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("availabilityZone") String availabilityZone,
@JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") EC2UserData userData
)
{
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.availabilityZone = availabilityZone;
this.nodeData = nodeData;
this.userData = userData;
}
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@JsonProperty
public int getMaxNumWorkers()
{
return maxNumWorkers;
}
@JsonProperty
public String getAvailabilityZone()
{
return availabilityZone;
}
@JsonProperty
public EC2NodeData getNodeData()
{
return nodeData;
}
@JsonProperty
public EC2UserData getUserData()
{
return userData;
}
@Override
public String toString()
{
return "WorkerSetupData{" +
", minNumWorkers=" + minNumWorkers +
", maxNumWorkers=" + maxNumWorkers +
", availabilityZone=" + availabilityZone +
", nodeData=" + nodeData +
", userData=" + userData +
'}';
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
@ -40,6 +41,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -59,6 +61,7 @@ import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
{
@ -407,7 +410,7 @@ public class RemoteTaskRunnerTest
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
new FillCapacityWorkerSelectStrategy()
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig()))
);
remoteTaskRunner.start();

View File

@ -1,64 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Charsets;
import com.metamx.common.StringUtils;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
import org.apache.commons.codec.binary.Base64;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@Deprecated
public class WorkerSetupDataTest
{
@Test
public void testGalaxyEC2UserDataSerde() throws IOException
{
final String json = "{\"env\":\"druid\",\"version\":null,\"type\":\"typical\"}";
final GalaxyEC2UserData userData = (GalaxyEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
Assert.assertEquals("druid", userData.getEnv());
Assert.assertEquals("typical", userData.getType());
Assert.assertNull(userData.getVersion());
Assert.assertEquals("1234", userData.withVersion("1234").getVersion());
}
@Test
public void testStringEC2UserDataSerde() throws IOException
{
final String json = "{\"impl\":\"string\",\"data\":\"hey :ver:\",\"versionReplacementString\":\":ver:\",\"version\":\"1234\"}";
final StringEC2UserData userData = (StringEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
Assert.assertEquals("hey :ver:", userData.getData());
Assert.assertEquals("1234", userData.getVersion());
Assert.assertEquals(
Base64.encodeBase64String(StringUtils.toUtf8("hey 1234")),
userData.getUserDataBase64()
);
Assert.assertEquals(
Base64.encodeBase64String(StringUtils.toUtf8("hey xyz")),
userData.withVersion("xyz").getUserDataBase64()
);
}
}

View File

@ -69,7 +69,6 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.http.RedirectFilter;
@ -200,7 +199,6 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
}