1) Stop polling in ConfigManager when stop is called

2) Remove WorkSetupManager in favor of just using ConfigManager
This commit is contained in:
Eric Tschetter 2013-03-07 17:43:37 -06:00
parent 1c3ef48f34
commit f70f71243d
13 changed files with 106 additions and 172 deletions

View File

@ -1,6 +1,5 @@
package com.metamx.druid.config; package com.metamx.druid.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -40,6 +39,8 @@ public class ConfigManager
private final ConcurrentMap<String, ConfigHolder> watchedConfigs; private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
private final String selectStatement; private final String selectStatement;
private volatile ConfigManager.PollingCallable poller;
public ConfigManager(IDBI dbi, ConfigManagerConfig config) public ConfigManager(IDBI dbi, ConfigManagerConfig config)
{ {
this.dbi = dbi; this.dbi = dbi;
@ -58,19 +59,8 @@ public class ConfigManager
return; return;
} }
ScheduledExecutors.scheduleWithFixedDelay( poller = new PollingCallable();
exec, ScheduledExecutors.scheduleWithFixedDelay(exec, new Duration(0), config.getPollDuration(), poller);
new Duration(0),
config.getPollDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
);
started = true; started = true;
} }
@ -84,6 +74,9 @@ public class ConfigManager
return; return;
} }
poller.stop();
poller = null;
started = false; started = false;
} }
} }
@ -119,8 +112,7 @@ public class ConfigManager
{ {
if (!started) { if (!started) {
watchedConfigs.put(key, new ConfigHolder<T>(null, serde)); watchedConfigs.put(key, new ConfigHolder<T>(null, serde));
} } else {
else {
try { try {
// Multiple of these callables can be submitted at the same time, but the callables themselves // Multiple of these callables can be submitted at the same time, but the callables themselves
// are executed serially, so double check that it hasn't already been populated. // are executed serially, so double check that it hasn't already been populated.
@ -200,10 +192,12 @@ public class ConfigManager
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement("INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload") handle.createStatement(
.bind("name", key) "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload"
.bind("payload", newBytes) )
.execute(); .bind("name", key)
.bind("payload", newBytes)
.execute();
return null; return null;
} }
} }
@ -256,4 +250,25 @@ public class ConfigManager
return false; return false;
} }
} }
private class PollingCallable implements Callable<ScheduledExecutors.Signal>
{
private volatile boolean stop = false;
void stop()
{
stop = true;
}
@Override
public ScheduledExecutors.Signal call() throws Exception
{
if (stop) {
return ScheduledExecutors.Signal.STOP;
}
poll();
return ScheduledExecutors.Signal.REPEAT;
}
}
} }

View File

@ -35,7 +35,7 @@ import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
@ -57,6 +57,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure
@ -82,7 +83,7 @@ public class RemoteTaskRunner implements TaskRunner
private final PathChildrenCache workerPathCache; private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec; private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory; private final RetryPolicyFactory retryPolicyFactory;
private final WorkerSetupManager workerSetupManager; private final AtomicReference<WorkerSetupData> workerSetupData;
// all workers that exist in ZK // all workers that exist in ZK
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>(); private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
@ -104,7 +105,7 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache, PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec, ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory, RetryPolicyFactory retryPolicyFactory,
WorkerSetupManager workerSetupManager AtomicReference<WorkerSetupData> workerSetupData
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -113,7 +114,7 @@ public class RemoteTaskRunner implements TaskRunner
this.workerPathCache = workerPathCache; this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec; this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory; this.retryPolicyFactory = retryPolicyFactory;
this.workerSetupManager = workerSetupManager; this.workerSetupData = workerSetupData;
} }
@LifecycleStart @LifecycleStart
@ -548,7 +549,7 @@ public class RemoteTaskRunner implements TaskRunner
return (!input.isAtCapacity() && return (!input.isAtCapacity() &&
input.getWorker() input.getWorker()
.getVersion() .getVersion()
.compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0); .compareTo(workerSetupData.get().getMinVersion()) >= 0);
} }
} }
) )

View File

@ -54,9 +54,9 @@ import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox; import com.metamx.druid.merger.common.actions.TaskActionToolbox;
@ -89,7 +89,7 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerCo
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -124,6 +124,7 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
@ -157,7 +158,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
private CuratorFramework curatorFramework = null; private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null; private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig; private IndexerZkConfig indexerZkConfig;
private WorkerSetupManager workerSetupManager = null;
private TaskRunnerFactory taskRunnerFactory = null; private TaskRunnerFactory taskRunnerFactory = null;
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null; private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
private TaskMasterLifecycle taskMasterLifecycle = null; private TaskMasterLifecycle taskMasterLifecycle = null;
@ -228,12 +228,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this; return this;
} }
public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager)
{
this.workerSetupManager = workerSetupManager;
return this;
}
public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
{ {
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
@ -250,6 +244,10 @@ public class IndexerCoordinatorNode extends RegisteringNode
{ {
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
JacksonConfigManager configManager = new JacksonConfigManager(new ConfigManager(dbi, managerConfig), jsonMapper);
initializeEmitter(); initializeEmitter();
initializeMonitors(); initializeMonitors();
initializeDB(); initializeDB();
@ -266,9 +264,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCurator(); initializeCurator();
initializeIndexerZkConfig(); initializeIndexerZkConfig();
initializeWorkerSetupManager(); initializeTaskRunnerFactory(configManager);
initializeTaskRunnerFactory(); initializeResourceManagement(configManager);
initializeResourceManagement();
initializeTaskMasterLifecycle(); initializeTaskMasterLifecycle();
initializeServer(); initializeServer();
@ -288,7 +285,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
emitter, emitter,
taskMasterLifecycle, taskMasterLifecycle,
new TaskStorageQueryAdapter(taskStorage), new TaskStorageQueryAdapter(taskStorage),
workerSetupManager configManager
) )
); );
@ -565,20 +562,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
} }
} }
public void initializeWorkerSetupManager() private void initializeTaskRunnerFactory(final JacksonConfigManager configManager)
{
if (workerSetupManager == null) {
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
final ConfigManager configManager = new ConfigManager(dbi, configManagerConfig);
lifecycle.addManagedInstance(configManager);
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
workerSetupManager = new WorkerSetupManager(new JacksonConfigManager(configManager, jsonMapper));
}
lifecycle.addManagedInstance(workerSetupManager);
}
public void initializeTaskRunnerFactory()
{ {
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
if (config.getRunnerImpl().equals("remote")) { if (config.getRunnerImpl().equals("remote")) {
@ -604,7 +588,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec, retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
workerSetupManager configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class)
); );
return remoteTaskRunner; return remoteTaskRunner;
@ -627,7 +611,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
} }
} }
private void initializeResourceManagement() private void initializeResourceManagement(final JacksonConfigManager configManager)
{ {
if (resourceManagementSchedulerFactory == null) { if (resourceManagementSchedulerFactory == null) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
@ -642,6 +626,9 @@ public class IndexerCoordinatorNode extends RegisteringNode
.setNameFormat("ScalingExec--%d") .setNameFormat("ScalingExec--%d")
.build() .build()
); );
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
AutoScalingStrategy strategy; AutoScalingStrategy strategy;
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) { if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
@ -654,7 +641,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
) )
), ),
configFactory.build(EC2AutoScalingStrategyConfig.class), configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupManager workerSetupData
); );
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy(); strategy = new NoopAutoScalingStrategy();
@ -667,7 +654,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
new SimpleResourceManagementStrategy( new SimpleResourceManagementStrategy(
strategy, strategy,
configFactory.build(SimpleResourceManagmentConfig.class), configFactory.build(SimpleResourceManagmentConfig.class),
workerSetupManager workerSetupData
), ),
configFactory.build(ResourceManagementSchedulerConfig.class), configFactory.build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec scalingScheduledExec

View File

@ -19,6 +19,8 @@
package com.metamx.druid.merger.coordinator.http; package com.metamx.druid.merger.coordinator.http;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -26,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
@ -35,10 +38,7 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStats; import com.metamx.druid.merger.coordinator.scaling.ScalingStats;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
@ -49,6 +49,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
@ -61,16 +62,18 @@ public class IndexerCoordinatorResource
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle; private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final WorkerSetupManager workerSetupManager; private final JacksonConfigManager configManager;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private AtomicReference<WorkerSetupData> workerSetupDataRef = null;
@Inject @Inject
public IndexerCoordinatorResource( public IndexerCoordinatorResource(
IndexerCoordinatorConfig config, IndexerCoordinatorConfig config,
ServiceEmitter emitter, ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle, TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskStorageQueryAdapter taskStorageQueryAdapter,
WorkerSetupManager workerSetupManager, JacksonConfigManager configManager,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) throws Exception ) throws Exception
{ {
@ -78,7 +81,7 @@ public class IndexerCoordinatorResource
this.emitter = emitter; this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle; this.taskMasterLifecycle = taskMasterLifecycle;
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.workerSetupManager = workerSetupManager; this.configManager = configManager;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@ -159,7 +162,11 @@ public class IndexerCoordinatorResource
@Produces("application/json") @Produces("application/json")
public Response getWorkerSetupData() public Response getWorkerSetupData()
{ {
return Response.ok(workerSetupManager.getWorkerSetupData()).build(); if (workerSetupDataRef == null) {
workerSetupDataRef = configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class);
}
return Response.ok(workerSetupDataRef.get()).build();
} }
@POST @POST
@ -169,7 +176,7 @@ public class IndexerCoordinatorResource
final WorkerSetupData workerSetupData final WorkerSetupData workerSetupData
) )
{ {
if (!workerSetupManager.setWorkerSetupData(workerSetupData)) { if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
return Response.ok().build(); return Response.ok().build();

View File

@ -22,10 +22,10 @@ package com.metamx.druid.merger.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -41,7 +41,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle; private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final WorkerSetupManager workerSetupManager; private final JacksonConfigManager configManager;
public IndexerCoordinatorServletModule( public IndexerCoordinatorServletModule(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@ -49,7 +49,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
ServiceEmitter emitter, ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle, TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter, TaskStorageQueryAdapter taskStorageQueryAdapter,
WorkerSetupManager workerSetupManager JacksonConfigManager configManager
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -57,7 +57,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
this.emitter = emitter; this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle; this.taskMasterLifecycle = taskMasterLifecycle;
this.taskStorageQueryAdapter = taskStorageQueryAdapter; this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.workerSetupManager = workerSetupManager; this.configManager = configManager;
} }
@Override @Override
@ -69,7 +69,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(ServiceEmitter.class).toInstance(emitter); bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle); bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle);
bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter); bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter);
bind(WorkerSetupManager.class).toInstance(workerSetupManager); bind(JacksonConfigManager.class).toInstance(configManager);
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
} }

View File

@ -34,13 +34,13 @@ import com.google.common.collect.Lists;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
@ -51,26 +51,26 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final AmazonEC2Client amazonEC2Client; private final AmazonEC2Client amazonEC2Client;
private final EC2AutoScalingStrategyConfig config; private final EC2AutoScalingStrategyConfig config;
private final WorkerSetupManager workerSetupManager; private final AtomicReference<WorkerSetupData> workerSetupDataRef;
public EC2AutoScalingStrategy( public EC2AutoScalingStrategy(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
AmazonEC2Client amazonEC2Client, AmazonEC2Client amazonEC2Client,
EC2AutoScalingStrategyConfig config, EC2AutoScalingStrategyConfig config,
WorkerSetupManager workerSetupManager AtomicReference<WorkerSetupData> workerSetupDataRef
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.amazonEC2Client = amazonEC2Client; this.amazonEC2Client = amazonEC2Client;
this.config = config; this.config = config;
this.workerSetupManager = workerSetupManager; this.workerSetupDataRef = workerSetupDataRef;
} }
@Override @Override
public AutoScalingData<Instance> provision() public AutoScalingData<Instance> provision()
{ {
try { try {
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); WorkerSetupData setupData = workerSetupDataRef.get();
EC2NodeData workerConfig = setupData.getNodeData(); EC2NodeData workerConfig = setupData.getNodeData();
RunInstancesResult result = amazonEC2Client.runInstances( RunInstancesResult result = amazonEC2Client.runInstances(

View File

@ -27,16 +27,16 @@ import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.ZkWorker;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
@ -46,7 +46,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private final AutoScalingStrategy autoScalingStrategy; private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagmentConfig config; private final SimpleResourceManagmentConfig config;
private final WorkerSetupManager workerSetupManager; private final AtomicReference<WorkerSetupData> workerSetupdDataRef;
private final ScalingStats scalingStats; private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>(); private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
@ -58,12 +58,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public SimpleResourceManagementStrategy( public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy, AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagmentConfig config, SimpleResourceManagmentConfig config,
WorkerSetupManager workerSetupManager AtomicReference<WorkerSetupData> workerSetupdDataRef
) )
{ {
this.autoScalingStrategy = autoScalingStrategy; this.autoScalingStrategy = autoScalingStrategy;
this.config = config; this.config = config;
this.workerSetupManager = workerSetupManager; this.workerSetupdDataRef = workerSetupdDataRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
} }
@ -151,7 +151,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
boolean nothingTerminating = currentlyTerminating.isEmpty(); boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) { if (nothingTerminating) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) { if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers); log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false; return false;

View File

@ -31,6 +31,8 @@ import java.util.List;
*/ */
public class WorkerSetupData public class WorkerSetupData
{ {
public static final String CONFIG_KEY = "worker.setup";
private final String minVersion; private final String minVersion;
private final int minNumWorkers; private final int minNumWorkers;
private final EC2NodeData nodeData; private final EC2NodeData nodeData;

View File

@ -1,59 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.druid.config.JacksonConfigManager;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class WorkerSetupManager
{
private static final String WORKER_SETUP_KEY = "worker.setup";
private final JacksonConfigManager configManager;
private volatile AtomicReference<WorkerSetupData> workerSetupData = new AtomicReference<WorkerSetupData>(null);
public WorkerSetupManager(
JacksonConfigManager configManager
)
{
this.configManager = configManager;
}
@LifecycleStart
public void start()
{
workerSetupData = configManager.watch(WORKER_SETUP_KEY, WorkerSetupData.class);
}
public WorkerSetupData getWorkerSetupData()
{
return workerSetupData.get();
}
public boolean setWorkerSetupData(final WorkerSetupData value)
{
return configManager.set(WORKER_SETUP_KEY, value);
}
}

View File

@ -11,14 +11,12 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
@ -43,6 +41,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static junit.framework.Assert.fail; import static junit.framework.Assert.fail;
@ -61,7 +60,6 @@ public class RemoteTaskRunnerTest
private PathChildrenCache pathChildrenCache; private PathChildrenCache pathChildrenCache;
private RemoteTaskRunner remoteTaskRunner; private RemoteTaskRunner remoteTaskRunner;
private TaskMonitor taskMonitor; private TaskMonitor taskMonitor;
private WorkerSetupManager workerSetupManager;
private ScheduledExecutorService scheduledExec; private ScheduledExecutorService scheduledExec;
@ -317,17 +315,6 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception private void makeRemoteTaskRunner() throws Exception
{ {
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
new WorkerSetupData(
"0",
0,
null,
null
)
).atLeastOnce();
EasyMock.replay(workerSetupManager);
remoteTaskRunner = new RemoteTaskRunner( remoteTaskRunner = new RemoteTaskRunner(
jsonMapper, jsonMapper,
@ -336,7 +323,7 @@ public class RemoteTaskRunnerTest
pathChildrenCache, pathChildrenCache,
scheduledExec, scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()), new RetryPolicyFactory(new TestRetryPolicyConfig()),
workerSetupManager new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, null, null))
); );
// Create a single worker and wait for things for be ready // Create a single worker and wait for things for be ready

View File

@ -33,7 +33,6 @@ import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.GalaxyUserData; import com.metamx.druid.merger.coordinator.setup.GalaxyUserData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -42,6 +41,7 @@ import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
@ -57,7 +57,7 @@ public class EC2AutoScalingStrategyTest
private Reservation reservation; private Reservation reservation;
private Instance instance; private Instance instance;
private EC2AutoScalingStrategy strategy; private EC2AutoScalingStrategy strategy;
private WorkerSetupManager workerSetupManager; private AtomicReference<WorkerSetupData> workerSetupData;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -66,7 +66,7 @@ public class EC2AutoScalingStrategyTest
runInstancesResult = EasyMock.createMock(RunInstancesResult.class); runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class); reservation = EasyMock.createMock(Reservation.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); workerSetupData = new AtomicReference<WorkerSetupData>(null);
instance = new Instance() instance = new Instance()
.withInstanceId(INSTANCE_ID) .withInstanceId(INSTANCE_ID)
@ -85,7 +85,7 @@ public class EC2AutoScalingStrategyTest
return "8080"; return "8080";
} }
}, },
workerSetupManager workerSetupData
); );
} }
@ -96,13 +96,12 @@ public class EC2AutoScalingStrategyTest
EasyMock.verify(runInstancesResult); EasyMock.verify(runInstancesResult);
EasyMock.verify(describeInstancesResult); EasyMock.verify(describeInstancesResult);
EasyMock.verify(reservation); EasyMock.verify(reservation);
EasyMock.verify(workerSetupManager);
} }
@Test @Test
public void testScale() public void testScale()
{ {
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( workerSetupData.set(
new WorkerSetupData( new WorkerSetupData(
"0", "0",
0, 0,
@ -110,7 +109,6 @@ public class EC2AutoScalingStrategyTest
new GalaxyUserData("env", "version", "type") new GalaxyUserData("env", "version", "type")
) )
); );
EasyMock.replay(workerSetupManager);
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
runInstancesResult runInstancesResult

View File

@ -29,7 +29,6 @@ import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.ZkWorker;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import junit.framework.Assert; import junit.framework.Assert;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -42,21 +41,22 @@ import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
public class SimpleResourceManagementStrategyTest public class SimpleResourceManagementStrategyTest
{ {
private AutoScalingStrategy autoScalingStrategy; private AutoScalingStrategy autoScalingStrategy;
private WorkerSetupManager workerSetupManager;
private Task testTask; private Task testTask;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy; private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
private AtomicReference<WorkerSetupData> workerSetupData;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<WorkerSetupData>(null);
testTask = new TestTask( testTask = new TestTask(
"task1", "task1",
@ -105,7 +105,7 @@ public class SimpleResourceManagementStrategyTest
return new Duration(0); return new Duration(0);
} }
}, },
workerSetupManager workerSetupData
); );
} }
@ -187,8 +187,7 @@ public class SimpleResourceManagementStrategyTest
@Test @Test
public void testDoSuccessfulTerminate() throws Exception public void testDoSuccessfulTerminate() throws Exception
{ {
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(new WorkerSetupData("0", 0, null, null)); workerSetupData.set(new WorkerSetupData("0", 0, null, null));
EasyMock.replay(workerSetupManager);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
@ -212,15 +211,13 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
); );
EasyMock.verify(workerSetupManager);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }
@Test @Test
public void testSomethingTerminating() throws Exception public void testSomethingTerminating() throws Exception
{ {
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(new WorkerSetupData("0", 0, null, null)); workerSetupData.set(new WorkerSetupData("0", 0, null, null));
EasyMock.replay(workerSetupManager);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.<String>newArrayList("ip")).times(2);
@ -259,7 +256,6 @@ public class SimpleResourceManagementStrategyTest
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE
); );
EasyMock.verify(workerSetupManager);
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }

View File

@ -38,7 +38,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.20.1-SNAPSHOT</metamx.java-util.version> <metamx.java-util.version>0.21.0</metamx.java-util.version>
</properties> </properties>
<modules> <modules>