address code review

This commit is contained in:
fjy 2014-11-17 15:55:22 -08:00
parent 1af6b337f2
commit 32600e10bb
6 changed files with 201 additions and 116 deletions

View File

@ -4,18 +4,19 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Maps;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
/**
*/
public class FillCapacityWithPreferenceConfig
public class FillCapacityWithIsolationConfig
{
@JsonProperty
@NotNull
// key:Datasource, value:nodeHostName
private Map<String, String> preferences = Maps.newHashMap();
// key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> preferences = Maps.newHashMap();
public Map<String, String> getPreferences()
public Map<String, List<String>> getPreferences()
{
return preferences;
}

View File

@ -0,0 +1,72 @@
package io.druid.indexing.overlord.setup;
import com.google.api.client.util.Sets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.List;
import java.util.Set;
/**
*/
public class FillCapacityWithIsolationWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final FillCapacityWithIsolationConfig isolationConfig;
private final Set<String> isolatedWorkers = Sets.newHashSet();
@Inject
public FillCapacityWithIsolationWorkerSelectStrategy(
FillCapacityWithIsolationConfig isolationConfig,
RemoteTaskRunnerConfig config
)
{
super(config);
this.isolationConfig = isolationConfig;
for (List<String> isolated : isolationConfig.getPreferences().values()) {
for (String isolatedWorker : isolated) {
isolatedWorkers.add(isolatedWorker);
}
}
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
)
{
// remove isolatedWorkers
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!isolatedWorkers.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
List<String> workerHosts = isolationConfig.getPreferences().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(eligibleWorkers, task);
}
ImmutableMap.Builder<String, ImmutableZkWorker> isolatedBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
isolatedBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableZkWorker> isolatedWorkers = isolatedBuilder.build();
if (isolatedWorkers.isEmpty()) {
return super.findWorkerForTask(eligibleWorkers, task);
} else {
return super.findWorkerForTask(isolatedWorkers, task);
}
}
}

View File

@ -1,44 +0,0 @@
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
/**
*/
public class FillCapacityWithPreferenceWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final FillCapacityWithPreferenceConfig preferenceConfig;
private final RemoteTaskRunnerConfig config;
@Inject
public FillCapacityWithPreferenceWorkerSelectStrategy(
FillCapacityWithPreferenceConfig preferenceConfig,
RemoteTaskRunnerConfig config
)
{
super(config);
this.preferenceConfig = preferenceConfig;
this.config = config;
}
@Override
public Optional<ImmutableZkWorker> findWorkerForTask(
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
)
{
String preferredZkWorker = preferenceConfig.getPreferences().get(task.getDataSource());
ImmutableZkWorker zkWorker = zkWorkers.get(preferredZkWorker);
final String minWorkerVer = config.getMinWorkerVersion();
if (zkWorker != null && zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
return super.findWorkerForTask(zkWorkers, task);
}
}

View File

@ -0,0 +1,121 @@
package io.druid.indexing.overlord.setup;
import com.google.api.client.util.Sets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class FillCapacityWithIsolationWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
new FillCapacityWithIsolationConfig()
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@Test
public void testFindWorkerForTaskWithNulls() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
new FillCapacityWithIsolationConfig()
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@Test
public void testIsolation() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
new FillCapacityWithIsolationConfig()
{
@Override
public Map<String, List<String>> getPreferences()
{
return ImmutableMap.of("foo", Arrays.asList("localhost"));
}
},
new RemoteTaskRunnerConfig()
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
Assert.assertFalse(optional.isPresent());
}
}

View File

@ -1,65 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.google.api.client.util.Sets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Map;
public class FillCapacityWithPreferenceWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithPreferenceWorkerSelectStrategy(
new FillCapacityWithPreferenceConfig()
{
@Override
public Map<String, String> getPreferences()
{
return ImmutableMap.of("foo", "localhost");
}
},
new RemoteTaskRunnerConfig()
);
Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
ImmutableMap.of(
"localhost",
new ImmutableZkWorker(
new Worker("localhost", "localhost", 1, "v1"),
0,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null)
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
}

View File

@ -71,7 +71,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWithPreferenceWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWithIsolationWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
@ -220,8 +220,8 @@ public class CliOverlord extends ServerRunnable
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
stratBinder.addBinding("fillCapacityWithPreference")
.to(FillCapacityWithPreferenceWorkerSelectStrategy.class);
binder.bind(FillCapacityWithPreferenceWorkerSelectStrategy.class).in(LazySingleton.class);
.to(FillCapacityWithIsolationWorkerSelectStrategy.class);
binder.bind(FillCapacityWithIsolationWorkerSelectStrategy.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)