diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManager.java b/common/src/main/java/com/metamx/druid/config/ConfigManager.java index 3073c13e9fb..1ecfd24482c 100644 --- a/common/src/main/java/com/metamx/druid/config/ConfigManager.java +++ b/common/src/main/java/com/metamx/druid/config/ConfigManager.java @@ -1,6 +1,5 @@ package com.metamx.druid.config; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.concurrent.ScheduledExecutors; @@ -40,6 +39,8 @@ public class ConfigManager private final ConcurrentMap watchedConfigs; private final String selectStatement; + private volatile ConfigManager.PollingCallable poller; + public ConfigManager(IDBI dbi, ConfigManagerConfig config) { this.dbi = dbi; @@ -58,19 +59,8 @@ public class ConfigManager return; } - ScheduledExecutors.scheduleWithFixedDelay( - exec, - new Duration(0), - config.getPollDuration(), - new Runnable() - { - @Override - public void run() - { - poll(); - } - } - ); + poller = new PollingCallable(); + ScheduledExecutors.scheduleWithFixedDelay(exec, new Duration(0), config.getPollDuration(), poller); started = true; } @@ -84,6 +74,9 @@ public class ConfigManager return; } + poller.stop(); + poller = null; + started = false; } } @@ -119,8 +112,7 @@ public class ConfigManager { if (!started) { watchedConfigs.put(key, new ConfigHolder(null, serde)); - } - else { + } else { try { // Multiple of these callables can be submitted at the same time, but the callables themselves // are executed serially, so double check that it hasn't already been populated. @@ -200,10 +192,12 @@ public class ConfigManager @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement("INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload") - .bind("name", key) - .bind("payload", newBytes) - .execute(); + handle.createStatement( + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload" + ) + .bind("name", key) + .bind("payload", newBytes) + .execute(); return null; } } @@ -256,4 +250,25 @@ public class ConfigManager return false; } } + + private class PollingCallable implements Callable + { + private volatile boolean stop = false; + + void stop() + { + stop = true; + } + + @Override + public ScheduledExecutors.Signal call() throws Exception + { + if (stop) { + return ScheduledExecutors.Signal.STOP; + } + + poll(); + return ScheduledExecutors.Signal.REPEAT; + } + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index e4a13136152..b1ed92087bc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -35,7 +35,7 @@ import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure @@ -82,7 +83,7 @@ public class RemoteTaskRunner implements TaskRunner private final PathChildrenCache workerPathCache; private final ScheduledExecutorService scheduledExec; private final RetryPolicyFactory retryPolicyFactory; - private final WorkerSetupManager workerSetupManager; + private final AtomicReference workerSetupData; // all workers that exist in ZK private final Map zkWorkers = new ConcurrentHashMap(); @@ -104,7 +105,7 @@ public class RemoteTaskRunner implements TaskRunner PathChildrenCache workerPathCache, ScheduledExecutorService scheduledExec, RetryPolicyFactory retryPolicyFactory, - WorkerSetupManager workerSetupManager + AtomicReference workerSetupData ) { this.jsonMapper = jsonMapper; @@ -113,7 +114,7 @@ public class RemoteTaskRunner implements TaskRunner this.workerPathCache = workerPathCache; this.scheduledExec = scheduledExec; this.retryPolicyFactory = retryPolicyFactory; - this.workerSetupManager = workerSetupManager; + this.workerSetupData = workerSetupData; } @LifecycleStart @@ -548,7 +549,7 @@ public class RemoteTaskRunner implements TaskRunner return (!input.isAtCapacity() && input.getWorker() .getVersion() - .compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0); + .compareTo(workerSetupData.get().getMinVersion()) >= 0); } } ) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 4c6d47dbad2..e717e3f9267 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -53,9 +53,9 @@ import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.S3DataSegmentKiller; -import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.TaskActionToolbox; @@ -88,7 +88,7 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerCo import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -122,6 +122,7 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -155,7 +156,6 @@ public class IndexerCoordinatorNode extends RegisteringNode private CuratorFramework curatorFramework = null; private ScheduledExecutorFactory scheduledExecutorFactory = null; private IndexerZkConfig indexerZkConfig; - private WorkerSetupManager workerSetupManager = null; private TaskRunnerFactory taskRunnerFactory = null; private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null; private TaskMasterLifecycle taskMasterLifecycle = null; @@ -226,12 +226,6 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } - public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager) - { - this.workerSetupManager = workerSetupManager; - return this; - } - public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { this.taskRunnerFactory = taskRunnerFactory; @@ -248,6 +242,10 @@ public class IndexerCoordinatorNode extends RegisteringNode { scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); + final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class); + DbConnector.createConfigTable(dbi, managerConfig.getConfigTable()); + JacksonConfigManager configManager = new JacksonConfigManager(new ConfigManager(dbi, managerConfig), jsonMapper); + initializeEmitter(); initializeMonitors(); initializeDB(); @@ -264,9 +262,8 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeJacksonSubtypes(); initializeCurator(); initializeIndexerZkConfig(); - initializeWorkerSetupManager(); - initializeTaskRunnerFactory(); - initializeResourceManagement(); + initializeTaskRunnerFactory(configManager); + initializeResourceManagement(configManager); initializeTaskMasterLifecycle(); initializeServer(); @@ -286,7 +283,7 @@ public class IndexerCoordinatorNode extends RegisteringNode emitter, taskMasterLifecycle, new TaskStorageQueryAdapter(taskStorage), - workerSetupManager + configManager ) ); @@ -555,20 +552,7 @@ public class IndexerCoordinatorNode extends RegisteringNode } } - public void initializeWorkerSetupManager() - { - if (workerSetupManager == null) { - final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class); - final ConfigManager configManager = new ConfigManager(dbi, configManagerConfig); - lifecycle.addManagedInstance(configManager); - - DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable()); - workerSetupManager = new WorkerSetupManager(new JacksonConfigManager(configManager, jsonMapper)); - } - lifecycle.addManagedInstance(workerSetupManager); - } - - public void initializeTaskRunnerFactory() + private void initializeTaskRunnerFactory(final JacksonConfigManager configManager) { if (taskRunnerFactory == null) { if (config.getRunnerImpl().equals("remote")) { @@ -594,7 +578,7 @@ public class IndexerCoordinatorNode extends RegisteringNode new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), retryScheduledExec, new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), - workerSetupManager + configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class) ); return remoteTaskRunner; @@ -617,7 +601,7 @@ public class IndexerCoordinatorNode extends RegisteringNode } } - private void initializeResourceManagement() + private void initializeResourceManagement(final JacksonConfigManager configManager) { if (resourceManagementSchedulerFactory == null) { resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() @@ -632,6 +616,9 @@ public class IndexerCoordinatorNode extends RegisteringNode .setNameFormat("ScalingExec--%d") .build() ); + final AtomicReference workerSetupData = configManager.watch( + WorkerSetupData.CONFIG_KEY, WorkerSetupData.class + ); AutoScalingStrategy strategy; if (config.getStrategyImpl().equalsIgnoreCase("ec2")) { @@ -644,7 +631,7 @@ public class IndexerCoordinatorNode extends RegisteringNode ) ), configFactory.build(EC2AutoScalingStrategyConfig.class), - workerSetupManager + workerSetupData ); } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { strategy = new NoopAutoScalingStrategy(); @@ -657,7 +644,7 @@ public class IndexerCoordinatorNode extends RegisteringNode new SimpleResourceManagementStrategy( strategy, configFactory.build(SimpleResourceManagmentConfig.class), - workerSetupManager + workerSetupData ), configFactory.build(ResourceManagementSchedulerConfig.class), scalingScheduledExec diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 762d9ad28ee..fe5432cc713 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -19,12 +19,15 @@ package com.metamx.druid.merger.coordinator.http; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.task.Task; @@ -32,10 +35,7 @@ import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.type.TypeReference; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -46,6 +46,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.Response; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -58,16 +59,18 @@ public class IndexerCoordinatorResource private final ServiceEmitter emitter; private final TaskMasterLifecycle taskMasterLifecycle; private final TaskStorageQueryAdapter taskStorageQueryAdapter; - private final WorkerSetupManager workerSetupManager; + private final JacksonConfigManager configManager; private final ObjectMapper jsonMapper; + private AtomicReference workerSetupDataRef = null; + @Inject public IndexerCoordinatorResource( IndexerCoordinatorConfig config, ServiceEmitter emitter, TaskMasterLifecycle taskMasterLifecycle, TaskStorageQueryAdapter taskStorageQueryAdapter, - WorkerSetupManager workerSetupManager, + JacksonConfigManager configManager, ObjectMapper jsonMapper ) throws Exception { @@ -75,7 +78,7 @@ public class IndexerCoordinatorResource this.emitter = emitter; this.taskMasterLifecycle = taskMasterLifecycle; this.taskStorageQueryAdapter = taskStorageQueryAdapter; - this.workerSetupManager = workerSetupManager; + this.configManager = configManager; this.jsonMapper = jsonMapper; } @@ -152,7 +155,11 @@ public class IndexerCoordinatorResource @Produces("application/json") public Response getWorkerSetupData() { - return Response.ok(workerSetupManager.getWorkerSetupData()).build(); + if (workerSetupDataRef == null) { + workerSetupDataRef = configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class); + } + + return Response.ok(workerSetupDataRef.get()).build(); } @POST @@ -162,7 +169,7 @@ public class IndexerCoordinatorResource final WorkerSetupData workerSetupData ) { - if (!workerSetupManager.setWorkerSetupData(workerSetupData)) { + if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) { return Response.status(Response.Status.BAD_REQUEST).build(); } return Response.ok().build(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java index f5bdfd6cbb5..1024d921668 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorServletModule.java @@ -22,10 +22,10 @@ package com.metamx.druid.merger.coordinator.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.inject.Provides; +import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.service.ServiceEmitter; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -41,7 +41,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule private final ServiceEmitter emitter; private final TaskMasterLifecycle taskMasterLifecycle; private final TaskStorageQueryAdapter taskStorageQueryAdapter; - private final WorkerSetupManager workerSetupManager; + private final JacksonConfigManager configManager; public IndexerCoordinatorServletModule( ObjectMapper jsonMapper, @@ -49,7 +49,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule ServiceEmitter emitter, TaskMasterLifecycle taskMasterLifecycle, TaskStorageQueryAdapter taskStorageQueryAdapter, - WorkerSetupManager workerSetupManager + JacksonConfigManager configManager ) { this.jsonMapper = jsonMapper; @@ -57,7 +57,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule this.emitter = emitter; this.taskMasterLifecycle = taskMasterLifecycle; this.taskStorageQueryAdapter = taskStorageQueryAdapter; - this.workerSetupManager = workerSetupManager; + this.configManager = configManager; } @Override @@ -70,7 +70,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule bind(ServiceEmitter.class).toInstance(emitter); bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle); bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter); - bind(WorkerSetupManager.class).toInstance(workerSetupManager); + bind(JacksonConfigManager.class).toInstance(configManager); serve("/*").with(GuiceContainer.class); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 47ef22152dd..f9ecbb06f70 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -34,13 +34,13 @@ import com.google.common.collect.Lists; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.emitter.EmittingLogger; import org.apache.commons.codec.binary.Base64; import javax.annotation.Nullable; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -51,26 +51,26 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy private final ObjectMapper jsonMapper; private final AmazonEC2Client amazonEC2Client; private final EC2AutoScalingStrategyConfig config; - private final WorkerSetupManager workerSetupManager; + private final AtomicReference workerSetupDataRef; public EC2AutoScalingStrategy( ObjectMapper jsonMapper, AmazonEC2Client amazonEC2Client, EC2AutoScalingStrategyConfig config, - WorkerSetupManager workerSetupManager + AtomicReference workerSetupDataRef ) { this.jsonMapper = jsonMapper; this.amazonEC2Client = amazonEC2Client; this.config = config; - this.workerSetupManager = workerSetupManager; + this.workerSetupDataRef = workerSetupDataRef; } @Override public AutoScalingData provision() { try { - WorkerSetupData setupData = workerSetupManager.getWorkerSetupData(); + WorkerSetupData setupData = workerSetupDataRef.get(); EC2NodeData workerConfig = setupData.getNodeData(); RunInstancesResult result = amazonEC2Client.runInstances( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 05c2f2f0b26..082870c83c8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -27,16 +27,16 @@ import com.google.common.collect.Sets; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.ZkWorker; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.joda.time.Duration; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -46,7 +46,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private final AutoScalingStrategy autoScalingStrategy; private final SimpleResourceManagmentConfig config; - private final WorkerSetupManager workerSetupManager; + private final AtomicReference workerSetupdDataRef; private final ScalingStats scalingStats; private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet(); @@ -58,12 +58,12 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat public SimpleResourceManagementStrategy( AutoScalingStrategy autoScalingStrategy, SimpleResourceManagmentConfig config, - WorkerSetupManager workerSetupManager + AtomicReference workerSetupdDataRef ) { this.autoScalingStrategy = autoScalingStrategy; this.config = config; - this.workerSetupManager = workerSetupManager; + this.workerSetupdDataRef = workerSetupdDataRef; this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); } @@ -151,7 +151,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat boolean nothingTerminating = currentlyTerminating.isEmpty(); if (nothingTerminating) { - final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); + final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers(); if (zkWorkers.size() <= minNumWorkers) { log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers); return false; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index fada73cb40e..18cd85e6962 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -31,6 +31,8 @@ import java.util.List; */ public class WorkerSetupData { + public static final String CONFIG_KEY = "worker.setup"; + private final String minVersion; private final int minNumWorkers; private final EC2NodeData nodeData; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java deleted file mode 100644 index b9ce066c327..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.coordinator.setup; - -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.druid.config.JacksonConfigManager; - -import java.util.concurrent.atomic.AtomicReference; - -/** - */ -public class WorkerSetupManager -{ - private static final String WORKER_SETUP_KEY = "worker.setup"; - - private final JacksonConfigManager configManager; - - private volatile AtomicReference workerSetupData = new AtomicReference(null); - - public WorkerSetupManager( - JacksonConfigManager configManager - ) - { - this.configManager = configManager; - } - - @LifecycleStart - public void start() - { - workerSetupData = configManager.watch(WORKER_SETUP_KEY, WorkerSetupData.class); - } - - public WorkerSetupData getWorkerSetupData() - { - return workerSetupData.get(); - } - - public boolean setWorkerSetupData(final WorkerSetupData value) - { - return configManager.set(WORKER_SETUP_KEY, value); - } -} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 18ce14a4555..d88ac044aed 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -11,14 +11,12 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; @@ -43,6 +41,7 @@ import org.junit.Test; import java.io.File; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; import static junit.framework.Assert.fail; @@ -61,7 +60,6 @@ public class RemoteTaskRunnerTest private PathChildrenCache pathChildrenCache; private RemoteTaskRunner remoteTaskRunner; private TaskMonitor taskMonitor; - private WorkerSetupManager workerSetupManager; private ScheduledExecutorService scheduledExec; @@ -317,17 +315,6 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); - workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); - - EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( - new WorkerSetupData( - "0", - 0, - null, - null - ) - ).atLeastOnce(); - EasyMock.replay(workerSetupManager); remoteTaskRunner = new RemoteTaskRunner( jsonMapper, @@ -336,7 +323,7 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - workerSetupManager + new AtomicReference(new WorkerSetupData("0", 0, null, null)) ); // Create a single worker and wait for things for be ready diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index c3aa8378b07..cd569cb77e8 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -33,7 +33,6 @@ import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.setup.EC2NodeData; import com.metamx.druid.merger.coordinator.setup.GalaxyUserData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -42,6 +41,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -57,7 +57,7 @@ public class EC2AutoScalingStrategyTest private Reservation reservation; private Instance instance; private EC2AutoScalingStrategy strategy; - private WorkerSetupManager workerSetupManager; + private AtomicReference workerSetupData; @Before public void setUp() throws Exception @@ -66,7 +66,7 @@ public class EC2AutoScalingStrategyTest runInstancesResult = EasyMock.createMock(RunInstancesResult.class); describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class); reservation = EasyMock.createMock(Reservation.class); - workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); + workerSetupData = new AtomicReference(null); instance = new Instance() .withInstanceId(INSTANCE_ID) @@ -85,7 +85,7 @@ public class EC2AutoScalingStrategyTest return "8080"; } }, - workerSetupManager + workerSetupData ); } @@ -96,13 +96,12 @@ public class EC2AutoScalingStrategyTest EasyMock.verify(runInstancesResult); EasyMock.verify(describeInstancesResult); EasyMock.verify(reservation); - EasyMock.verify(workerSetupManager); } @Test public void testScale() { - EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn( + workerSetupData.set( new WorkerSetupData( "0", 0, @@ -110,7 +109,6 @@ public class EC2AutoScalingStrategyTest new GalaxyUserData("env", "version", "type") ) ); - EasyMock.replay(workerSetupManager); EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn( runInstancesResult diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 742525d38a7..2052ae014bb 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -29,7 +29,6 @@ import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; -import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.Worker; import junit.framework.Assert; import org.easymock.EasyMock; @@ -42,21 +41,22 @@ import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; /** */ public class SimpleResourceManagementStrategyTest { private AutoScalingStrategy autoScalingStrategy; - private WorkerSetupManager workerSetupManager; private Task testTask; private SimpleResourceManagementStrategy simpleResourceManagementStrategy; + private AtomicReference workerSetupData; @Before public void setUp() throws Exception { - workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); + workerSetupData = new AtomicReference(null); testTask = new TestTask( "task1", @@ -105,7 +105,7 @@ public class SimpleResourceManagementStrategyTest return new Duration(0); } }, - workerSetupManager + workerSetupData ); } @@ -187,8 +187,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testDoSuccessfulTerminate() throws Exception { - EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(new WorkerSetupData("0", 0, null, null)); - EasyMock.replay(workerSetupManager); + workerSetupData.set(new WorkerSetupData("0", 0, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()); @@ -212,15 +211,13 @@ public class SimpleResourceManagementStrategyTest simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); - EasyMock.verify(workerSetupManager); EasyMock.verify(autoScalingStrategy); } @Test public void testSomethingTerminating() throws Exception { - EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(new WorkerSetupData("0", 0, null, null)); - EasyMock.replay(workerSetupManager); + workerSetupData.set(new WorkerSetupData("0", 0, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")).times(2); @@ -259,7 +256,6 @@ public class SimpleResourceManagementStrategyTest simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE ); - EasyMock.verify(workerSetupManager); EasyMock.verify(autoScalingStrategy); } diff --git a/pom.xml b/pom.xml index f61f80c1ff4..d430a91ae5d 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.20.1-SNAPSHOT + 0.21.0