mirror of https://github.com/apache/druid.git
Make dynamic worker selection actually work
This commit is contained in:
parent
b78ef22829
commit
2c2771b90e
|
@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
|
|||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -48,11 +49,11 @@ import io.druid.curator.cache.PathChildrenCacheFactory;
|
|||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
|
||||
import io.druid.indexing.worker.TaskAnnouncement;
|
||||
import io.druid.indexing.worker.Worker;
|
||||
import io.druid.server.initialization.IndexerZkConfig;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.tasklogs.TaskLogStreamer;
|
||||
import org.apache.commons.lang.mutable.MutableInt;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
@ -108,7 +109,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
private final PathChildrenCacheFactory pathChildrenCacheFactory;
|
||||
private final PathChildrenCache workerPathCache;
|
||||
private final HttpClient httpClient;
|
||||
private final WorkerSelectStrategy strategy;
|
||||
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
|
||||
|
||||
// all workers that exist in ZK
|
||||
private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<>();
|
||||
|
@ -134,7 +135,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
CuratorFramework cf,
|
||||
PathChildrenCacheFactory pathChildrenCacheFactory,
|
||||
HttpClient httpClient,
|
||||
WorkerSelectStrategy strategy
|
||||
Supplier<WorkerBehaviorConfig> workerConfigRef
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
@ -144,7 +145,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
|
||||
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
|
||||
this.httpClient = httpClient;
|
||||
this.strategy = strategy;
|
||||
this.workerConfigRef = workerConfigRef;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -530,6 +531,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
return true;
|
||||
} else {
|
||||
// Nothing running this task, announce it in ZK for a worker to run it
|
||||
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
||||
WorkerSelectStrategy strategy;
|
||||
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
|
||||
log.warn("No worker selections strategy set. Using default.");
|
||||
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
|
||||
} else {
|
||||
strategy = workerConfig.getSelectStrategy();
|
||||
}
|
||||
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
|
||||
config,
|
||||
ImmutableMap.copyOf(
|
||||
|
|
|
@ -42,7 +42,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
|||
private final IndexerZkConfig zkPaths;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final WorkerSelectStrategy strategy;
|
||||
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
|
||||
|
||||
@Inject
|
||||
public RemoteTaskRunnerFactory(
|
||||
|
@ -51,7 +51,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
|||
final IndexerZkConfig zkPaths,
|
||||
final ObjectMapper jsonMapper,
|
||||
@Global final HttpClient httpClient,
|
||||
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
|
||||
final Supplier<WorkerBehaviorConfig> workerConfigRef
|
||||
)
|
||||
{
|
||||
this.curator = curator;
|
||||
|
@ -59,17 +59,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
|||
this.zkPaths = zkPaths;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.httpClient = httpClient;
|
||||
if (workerBehaviourConfigSupplier != null) {
|
||||
// Backwards compatibility
|
||||
final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get();
|
||||
if (workerBehaviorConfig != null) {
|
||||
this.strategy = workerBehaviorConfig.getSelectStrategy();
|
||||
} else {
|
||||
this.strategy = new FillCapacityWorkerSelectStrategy();
|
||||
}
|
||||
} else {
|
||||
this.strategy = new FillCapacityWorkerSelectStrategy();
|
||||
}
|
||||
this.workerConfigRef = workerConfigRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,7 +75,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
|
|||
.withCompressed(true)
|
||||
.build(),
|
||||
httpClient,
|
||||
strategy
|
||||
workerConfigRef
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
|||
synchronized (lock) {
|
||||
boolean didProvision = false;
|
||||
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
|
||||
if (workerConfig == null) {
|
||||
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
|
||||
log.warn("No workerConfig available, cannot provision new workers.");
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -44,7 +43,6 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem;
|
|||
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
|
||||
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
|
||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
import io.druid.metadata.EntryExistsException;
|
||||
import io.druid.tasklogs.TaskLogStreamer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -61,7 +59,6 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -82,9 +79,6 @@ public class OverlordResource
|
|||
|
||||
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
|
||||
|
||||
@Deprecated
|
||||
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
|
||||
|
||||
@Inject
|
||||
public OverlordResource(
|
||||
TaskMaster taskMaster,
|
||||
|
@ -198,37 +192,6 @@ public class OverlordResource
|
|||
return Response.ok().build();
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
@GET
|
||||
@Path("/worker/setup")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response getWorkerSetupData()
|
||||
{
|
||||
if (workerSetupDataRef == null) {
|
||||
workerSetupDataRef = configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class);
|
||||
}
|
||||
|
||||
return Response.ok(workerSetupDataRef.get()).build();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@POST
|
||||
@Path("/worker/setup")
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
public Response setWorkerSetupData(
|
||||
final WorkerSetupData workerSetupData
|
||||
)
|
||||
{
|
||||
if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
|
||||
return Response.status(Response.Status.BAD_REQUEST).build();
|
||||
}
|
||||
|
||||
log.info("Updating Worker Setup configs: %s", workerSetupData);
|
||||
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/action")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
|
|
|
@ -22,12 +22,20 @@ package io.druid.indexing.overlord.setup;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.indexing.overlord.autoscaling.AutoScaler;
|
||||
import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WorkerBehaviorConfig
|
||||
{
|
||||
public static final String CONFIG_KEY = "worker.config";
|
||||
public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy();
|
||||
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
|
||||
|
||||
public static WorkerBehaviorConfig defaultConfig()
|
||||
{
|
||||
return new WorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER);
|
||||
}
|
||||
|
||||
private final WorkerSelectStrategy selectStrategy;
|
||||
private final AutoScaler autoScaler;
|
||||
|
|
|
@ -1,97 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2NodeData;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Deprecated
|
||||
public class WorkerSetupData
|
||||
{
|
||||
public static final String CONFIG_KEY = "worker.setup";
|
||||
|
||||
private final int minNumWorkers;
|
||||
private final int maxNumWorkers;
|
||||
private final String availabilityZone;
|
||||
private final EC2NodeData nodeData;
|
||||
private final EC2UserData userData;
|
||||
|
||||
@JsonCreator
|
||||
public WorkerSetupData(
|
||||
@JsonProperty("minNumWorkers") int minNumWorkers,
|
||||
@JsonProperty("maxNumWorkers") int maxNumWorkers,
|
||||
@JsonProperty("availabilityZone") String availabilityZone,
|
||||
@JsonProperty("nodeData") EC2NodeData nodeData,
|
||||
@JsonProperty("userData") EC2UserData userData
|
||||
)
|
||||
{
|
||||
this.minNumWorkers = minNumWorkers;
|
||||
this.maxNumWorkers = maxNumWorkers;
|
||||
this.availabilityZone = availabilityZone;
|
||||
this.nodeData = nodeData;
|
||||
this.userData = userData;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMinNumWorkers()
|
||||
{
|
||||
return minNumWorkers;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxNumWorkers()
|
||||
{
|
||||
return maxNumWorkers;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getAvailabilityZone()
|
||||
{
|
||||
return availabilityZone;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public EC2NodeData getNodeData()
|
||||
{
|
||||
return nodeData;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public EC2UserData getUserData()
|
||||
{
|
||||
return userData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "WorkerSetupData{" +
|
||||
", minNumWorkers=" + minNumWorkers +
|
||||
", maxNumWorkers=" + maxNumWorkers +
|
||||
", availabilityZone=" + availabilityZone +
|
||||
", nodeData=" + nodeData +
|
||||
", userData=" + userData +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import com.google.common.collect.Sets;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.common.guava.DSuppliers;
|
||||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
|
||||
import io.druid.indexing.common.IndexingServiceCondition;
|
||||
|
@ -40,6 +41,7 @@ import io.druid.indexing.common.task.Task;
|
|||
import io.druid.indexing.common.task.TaskResource;
|
||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
|
||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||
import io.druid.indexing.worker.TaskAnnouncement;
|
||||
import io.druid.indexing.worker.Worker;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -59,6 +61,7 @@ import org.junit.Test;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class RemoteTaskRunnerTest
|
||||
{
|
||||
|
@ -407,7 +410,7 @@ public class RemoteTaskRunnerTest
|
|||
cf,
|
||||
new SimplePathChildrenCacheFactory.Builder().build(),
|
||||
null,
|
||||
new FillCapacityWorkerSelectStrategy()
|
||||
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig()))
|
||||
);
|
||||
|
||||
remoteTaskRunner.start();
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.metamx.common.StringUtils;
|
||||
import io.druid.indexing.common.TestUtils;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.EC2UserData;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.GalaxyEC2UserData;
|
||||
import io.druid.indexing.overlord.autoscaling.ec2.StringEC2UserData;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Deprecated
|
||||
public class WorkerSetupDataTest
|
||||
{
|
||||
@Test
|
||||
public void testGalaxyEC2UserDataSerde() throws IOException
|
||||
{
|
||||
final String json = "{\"env\":\"druid\",\"version\":null,\"type\":\"typical\"}";
|
||||
final GalaxyEC2UserData userData = (GalaxyEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
|
||||
Assert.assertEquals("druid", userData.getEnv());
|
||||
Assert.assertEquals("typical", userData.getType());
|
||||
Assert.assertNull(userData.getVersion());
|
||||
Assert.assertEquals("1234", userData.withVersion("1234").getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringEC2UserDataSerde() throws IOException
|
||||
{
|
||||
final String json = "{\"impl\":\"string\",\"data\":\"hey :ver:\",\"versionReplacementString\":\":ver:\",\"version\":\"1234\"}";
|
||||
final StringEC2UserData userData = (StringEC2UserData) TestUtils.MAPPER.readValue(json, EC2UserData.class);
|
||||
Assert.assertEquals("hey :ver:", userData.getData());
|
||||
Assert.assertEquals("1234", userData.getVersion());
|
||||
Assert.assertEquals(
|
||||
Base64.encodeBase64String(StringUtils.toUtf8("hey 1234")),
|
||||
userData.getUserDataBase64()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
Base64.encodeBase64String(StringUtils.toUtf8("hey xyz")),
|
||||
userData.withVersion("xyz").getUserDataBase64()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -69,7 +69,6 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
|
|||
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
|
||||
import io.druid.indexing.overlord.http.OverlordResource;
|
||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
import io.druid.indexing.worker.config.WorkerConfig;
|
||||
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||
import io.druid.server.http.RedirectFilter;
|
||||
|
@ -200,7 +199,6 @@ public class CliOverlord extends ServerRunnable
|
|||
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
|
||||
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
|
||||
|
||||
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
|
||||
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue