mirror of https://github.com/apache/druid.git
fix a few naming things
This commit is contained in:
parent
32600e10bb
commit
c91310914b
|
@ -9,7 +9,7 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class FillCapacityWithIsolationConfig
|
||||
public class FillCapacityWithAffinityConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@NotNull
|
|
@ -0,0 +1,76 @@
|
|||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.google.api.client.util.Sets;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
|
||||
{
|
||||
private final FillCapacityWithAffinityConfig affinityConfig;
|
||||
private final Set<String> affinityWorkerHosts = Sets.newHashSet();
|
||||
|
||||
|
||||
@Inject
|
||||
public FillCapacityWithAffinityWorkerSelectStrategy(
|
||||
FillCapacityWithAffinityConfig affinityConfig,
|
||||
RemoteTaskRunnerConfig config
|
||||
)
|
||||
{
|
||||
super(config);
|
||||
this.affinityConfig = affinityConfig;
|
||||
for (List<String> affinityWorkers : affinityConfig.getPreferences().values()) {
|
||||
for (String affinityWorker : affinityWorkers) {
|
||||
this.affinityWorkerHosts.add(affinityWorker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
||||
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
|
||||
)
|
||||
{
|
||||
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
|
||||
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
|
||||
for (String workerHost : zkWorkers.keySet()) {
|
||||
if (!affinityWorkerHosts.contains(workerHost)) {
|
||||
builder.put(workerHost, zkWorkers.get(workerHost));
|
||||
}
|
||||
}
|
||||
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
|
||||
|
||||
List<String> workerHosts = affinityConfig.getPreferences().get(task.getDataSource());
|
||||
if (workerHosts == null) {
|
||||
return super.findWorkerForTask(eligibleWorkers, task);
|
||||
}
|
||||
|
||||
ImmutableMap.Builder<String, ImmutableZkWorker> affinityBuilder = new ImmutableMap.Builder<>();
|
||||
for (String workerHost : workerHosts) {
|
||||
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
|
||||
if (zkWorker != null) {
|
||||
affinityBuilder.put(workerHost, zkWorker);
|
||||
}
|
||||
}
|
||||
ImmutableMap<String, ImmutableZkWorker> affinityWorkers = affinityBuilder.build();
|
||||
|
||||
if (!affinityWorkers.isEmpty()) {
|
||||
Optional<ImmutableZkWorker> retVal = super.findWorkerForTask(affinityWorkers, task);
|
||||
if (retVal.isPresent()) {
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
return super.findWorkerForTask(eligibleWorkers, task);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -1,72 +0,0 @@
|
|||
package io.druid.indexing.overlord.setup;
|
||||
|
||||
import com.google.api.client.util.Sets;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.ImmutableZkWorker;
|
||||
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class FillCapacityWithIsolationWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
|
||||
{
|
||||
private final FillCapacityWithIsolationConfig isolationConfig;
|
||||
private final Set<String> isolatedWorkers = Sets.newHashSet();
|
||||
|
||||
|
||||
@Inject
|
||||
public FillCapacityWithIsolationWorkerSelectStrategy(
|
||||
FillCapacityWithIsolationConfig isolationConfig,
|
||||
RemoteTaskRunnerConfig config
|
||||
)
|
||||
{
|
||||
super(config);
|
||||
this.isolationConfig = isolationConfig;
|
||||
for (List<String> isolated : isolationConfig.getPreferences().values()) {
|
||||
for (String isolatedWorker : isolated) {
|
||||
isolatedWorkers.add(isolatedWorker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ImmutableZkWorker> findWorkerForTask(
|
||||
ImmutableMap<String, ImmutableZkWorker> zkWorkers, Task task
|
||||
)
|
||||
{
|
||||
// remove isolatedWorkers
|
||||
ImmutableMap.Builder<String, ImmutableZkWorker> builder = new ImmutableMap.Builder<>();
|
||||
for (String workerHost : zkWorkers.keySet()) {
|
||||
if (!isolatedWorkers.contains(workerHost)) {
|
||||
builder.put(workerHost, zkWorkers.get(workerHost));
|
||||
}
|
||||
}
|
||||
ImmutableMap<String, ImmutableZkWorker> eligibleWorkers = builder.build();
|
||||
|
||||
List<String> workerHosts = isolationConfig.getPreferences().get(task.getDataSource());
|
||||
if (workerHosts == null) {
|
||||
return super.findWorkerForTask(eligibleWorkers, task);
|
||||
}
|
||||
|
||||
ImmutableMap.Builder<String, ImmutableZkWorker> isolatedBuilder = new ImmutableMap.Builder<>();
|
||||
for (String workerHost : workerHosts) {
|
||||
ImmutableZkWorker zkWorker = zkWorkers.get(workerHost);
|
||||
if (zkWorker != null) {
|
||||
isolatedBuilder.put(workerHost, zkWorker);
|
||||
}
|
||||
}
|
||||
ImmutableMap<String, ImmutableZkWorker> isolatedWorkers = isolatedBuilder.build();
|
||||
|
||||
if (isolatedWorkers.isEmpty()) {
|
||||
return super.findWorkerForTask(eligibleWorkers, task);
|
||||
} else {
|
||||
return super.findWorkerForTask(isolatedWorkers, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -14,13 +14,13 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class FillCapacityWithIsolationWorkerSelectStrategyTest
|
||||
public class FillCapacityWithAffinityWorkerSelectStrategyTest
|
||||
{
|
||||
@Test
|
||||
public void testFindWorkerForTask() throws Exception
|
||||
{
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
|
||||
new FillCapacityWithIsolationConfig()
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
|
||||
new FillCapacityWithAffinityConfig()
|
||||
{
|
||||
@Override
|
||||
public Map<String, List<String>> getPreferences()
|
||||
|
@ -60,8 +60,8 @@ public class FillCapacityWithIsolationWorkerSelectStrategyTest
|
|||
@Test
|
||||
public void testFindWorkerForTaskWithNulls() throws Exception
|
||||
{
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
|
||||
new FillCapacityWithIsolationConfig()
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
|
||||
new FillCapacityWithAffinityConfig()
|
||||
{
|
||||
@Override
|
||||
public Map<String, List<String>> getPreferences()
|
||||
|
@ -94,8 +94,8 @@ public class FillCapacityWithIsolationWorkerSelectStrategyTest
|
|||
@Test
|
||||
public void testIsolation() throws Exception
|
||||
{
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithIsolationWorkerSelectStrategy(
|
||||
new FillCapacityWithIsolationConfig()
|
||||
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
|
||||
new FillCapacityWithAffinityConfig()
|
||||
{
|
||||
@Override
|
||||
public Map<String, List<String>> getPreferences()
|
|
@ -71,7 +71,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl
|
|||
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
|
||||
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
|
||||
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
|
||||
import io.druid.indexing.overlord.setup.FillCapacityWithIsolationWorkerSelectStrategy;
|
||||
import io.druid.indexing.overlord.setup.FillCapacityWithAffinityWorkerSelectStrategy;
|
||||
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
|
||||
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
|
||||
import io.druid.indexing.overlord.setup.WorkerSetupData;
|
||||
|
@ -220,8 +220,8 @@ public class CliOverlord extends ServerRunnable
|
|||
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
|
||||
|
||||
stratBinder.addBinding("fillCapacityWithPreference")
|
||||
.to(FillCapacityWithIsolationWorkerSelectStrategy.class);
|
||||
binder.bind(FillCapacityWithIsolationWorkerSelectStrategy.class).in(LazySingleton.class);
|
||||
.to(FillCapacityWithAffinityWorkerSelectStrategy.class);
|
||||
binder.bind(FillCapacityWithAffinityWorkerSelectStrategy.class).in(LazySingleton.class);
|
||||
}
|
||||
|
||||
private void configureAutoscale(Binder binder)
|
||||
|
|
Loading…
Reference in New Issue