optionally choose what worker to send tasks to

This commit is contained in:
fjy 2014-11-17 14:50:56 -08:00
parent d5c4282766
commit 1af6b337f2
6 changed files with 147 additions and 20 deletions

View File

@ -84,7 +84,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
log.warn("No workerSetupData available, cannot provision new workers.");
return false;
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
@ -181,7 +181,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
@ -246,11 +246,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private static Predicate<ZkWorker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
final SimpleResourceManagementConfig config
)
{
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config);
return new Predicate<ZkWorker>()
{
@ -265,8 +264,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
final SimpleResourceManagementConfig config
)
{
return new Predicate<ZkWorker>()
@ -292,9 +290,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config, workerSetupData)
createValidWorkerPredicate(config)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final int minWorkerCount = workerSetupData.getMinNumWorkers();
final int maxWorkerCount = workerSetupData.getMaxNumWorkers();
@ -322,7 +320,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
final boolean notTakingActions = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty();
&& currentlyTerminating.isEmpty();
final boolean shouldScaleUp = notTakingActions
&& validWorkers.size() >= targetWorkerCount
&& targetWorkerCount < maxWorkerCount

View File

@ -0,0 +1,22 @@
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Maps;
import javax.validation.constraints.NotNull;
import java.util.Map;
/**
*/
public class FillCapacityWithPreferenceConfig
{
@JsonProperty
@NotNull
// key:Datasource, value:nodeHostName
private Map<String, String> preferences = Maps.newHashMap();
public Map<String, String> getPreferences()
{
return preferences;
}
}

View File

@ -0,0 +1,44 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
/**
*/
public class FillCapacityWithPreferenceWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final FillCapacityWithPreferenceConfig preferenceConfig;
private final RemoteTaskRunnerConfig config;
@Inject
public FillCapacityWithPreferenceWorkerSelectStrategy(
FillCapacityWithPreferenceConfig preferenceConfig,
RemoteTaskRunnerConfig config
)
{
super(config);
this.preferenceConfig = preferenceConfig;
this.config = config;
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
)
{
String preferredZkWorker = preferenceConfig.getPreferences().get(task.getDataSource());
ImmutableZkWorker zkWorker = zkWorkers.get(preferredZkWorker);
final String minWorkerVer = config.getMinWorkerVersion();
if (zkWorker != null && zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
return super.findWorkerForTask(zkWorkers, task);
}
}

View File

@ -19,23 +19,14 @@
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ZkWorker;
import java.util.Map;
/**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class)
})
public interface WorkerSelectStrategy
{
/**

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.google.api.client.util.Sets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Map;
public class FillCapacityWithPreferenceWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithPreferenceWorkerSelectStrategy(
new FillCapacityWithPreferenceConfig()
{
@Override
public Map<String, String> getPreferences()
{
return ImmutableMap.of("foo", "localhost");
}
},
new RemoteTaskRunnerConfig()
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"),
0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
}

View File

@ -71,6 +71,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWithPreferenceWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
@ -117,7 +118,9 @@ public class CliOverlord extends ServerRunnable
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
@ -215,6 +218,10 @@ public class CliOverlord extends ServerRunnable
stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class);
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
stratBinder.addBinding("fillCapacityWithPreference")
.to(FillCapacityWithPreferenceWorkerSelectStrategy.class);
binder.bind(FillCapacityWithPreferenceWorkerSelectStrategy.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)