address code review

This commit is contained in:
fjy 2014-11-24 10:52:30 -08:00
parent 64719b15e0
commit 1aaea9a0d7
7 changed files with 32 additions and 26 deletions

View File

@ -27,7 +27,7 @@ import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
@ -50,7 +50,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviourConfig> workerBehaviourConfigSupplier
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
)
{
this.curator = curator;
@ -60,9 +60,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.httpClient = httpClient;
if (workerBehaviourConfigSupplier != null) {
// Backwards compatibility
final WorkerBehaviourConfig workerBehaviourConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviourConfig != null) {
this.strategy = workerBehaviourConfig.getSelectStrategy();
final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviorConfig != null) {
this.strategy = workerBehaviorConfig.getSelectStrategy();
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}

View File

@ -34,7 +34,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -49,7 +49,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerBehaviourConfig> workerConfigRef;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScalingStats scalingStats;
private final Object lock = new Object();
@ -63,7 +63,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Inject
public SimpleResourceManagementStrategy(
SimpleResourceManagementConfig config,
Supplier<WorkerBehaviourConfig> workerConfigRef
Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.config = config;
@ -76,7 +76,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviourConfig workerConfig = workerConfigRef.get();
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot provision new workers.");
return false;
@ -142,7 +142,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
synchronized (lock) {
final WorkerBehaviourConfig workerConfig = workerConfigRef.get();
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
log.warn("No workerConfig available, cannot terminate workers.");
return false;
@ -279,7 +279,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
private void updateTargetWorkerCount(
final WorkerBehaviourConfig workerConfig,
final WorkerBehaviorConfig workerConfig,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)

View File

@ -42,7 +42,7 @@ import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
@ -78,7 +78,7 @@ public class OverlordResource
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
private AtomicReference<WorkerBehaviourConfig> workerConfigRef = null;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
@Deprecated
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@ -195,7 +195,7 @@ public class OverlordResource
public Response getWorkerConfig()
{
if (workerConfigRef == null) {
workerConfigRef = configManager.watch(WorkerBehaviourConfig.CONFIG_KEY, WorkerBehaviourConfig.class);
workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
}
return Response.ok(workerConfigRef.get()).build();
@ -205,14 +205,14 @@ public class OverlordResource
@Path("/worker")
@Consumes("application/json")
public Response setWorkerConfig(
final WorkerBehaviourConfig workerBehaviourConfig
final WorkerBehaviorConfig workerBehaviorConfig
)
{
if (!configManager.set(WorkerBehaviourConfig.CONFIG_KEY, workerBehaviourConfig)) {
if (!configManager.set(WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
log.info("Updating Worker configs: %s", workerBehaviourConfig);
log.info("Updating Worker configs: %s", workerBehaviorConfig);
return Response.ok().build();
}

View File

@ -25,7 +25,7 @@ import io.druid.indexing.overlord.autoscaling.AutoScaler;
/**
*/
public class WorkerBehaviourConfig
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
@ -33,7 +33,7 @@ public class WorkerBehaviourConfig
private final AutoScaler autoScaler;
@JsonCreator
public WorkerBehaviourConfig(
public WorkerBehaviorConfig(
@JsonProperty("selectStrategy") WorkerSelectStrategy selectStrategy,
@JsonProperty("autoScaler") AutoScaler autoScaler
)
@ -64,7 +64,7 @@ public class WorkerBehaviourConfig
return false;
}
WorkerBehaviourConfig that = (WorkerBehaviourConfig) o;
WorkerBehaviorConfig that = (WorkerBehaviorConfig) o;
if (autoScaler != null ? !autoScaler.equals(that.autoScaler) : that.autoScaler != null) {
return false;

View File

@ -32,7 +32,7 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
import io.druid.indexing.overlord.setup.WorkerBehaviourConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -59,7 +59,7 @@ public class SimpleResourceManagementStrategyTest
private Task testTask;
private SimpleResourceManagementConfig simpleResourceManagementConfig;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerBehaviourConfig> workerConfig;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
@Before
public void setUp() throws Exception
@ -93,7 +93,7 @@ public class SimpleResourceManagementStrategyTest
.setWorkerVersion("");
workerConfig = new AtomicReference<>(
new WorkerBehaviourConfig(
new WorkerBehaviorConfig(
null,
autoScaler
)

View File

@ -34,12 +34,12 @@ import org.junit.Test;
import java.util.Arrays;
public class WorkerBehaviourConfigTest
public class WorkerBehaviorConfigTest
{
@Test
public void testSerde() throws Exception
{
WorkerBehaviourConfig config = new WorkerBehaviourConfig(
WorkerBehaviorConfig config = new WorkerBehaviorConfig(
new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost"))
@ -82,6 +82,6 @@ public class WorkerBehaviourConfigTest
}
}
);
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), WorkerBehaviourConfig.class));
Assert.assertEquals(config, mapper.readValue(mapper.writeValueAsBytes(config), WorkerBehaviorConfig.class));
}
}

View File

@ -35,6 +35,7 @@ import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
@ -67,6 +68,8 @@ import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.http.RedirectFilter;
@ -196,6 +199,9 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
}
private void configureAutoscale(Binder binder)