From 50836798faa3355339a4dfc592b97e334b6bfbe2 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 15:40:45 -0700 Subject: [PATCH 1/2] toggle between compressed and non compressed service discovery --- .../ServiceDiscoveryConfig.java | 7 +++++ .../worker/executor/ExecutorNode.java | 30 +++++++++---------- .../indexing/worker/http/WorkerNode.java | 6 ++-- .../com/metamx/druid/http/MasterMain.java | 10 +++++-- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 04776d6545a..2d828cac9fd 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig @Config("druid.zk.paths.discoveryPath") public abstract String getDiscoveryPath(); + + @Override + @Config("druid.service.discovery.curator.compression.enable") + public boolean enableCompression() + { + return false; + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index c5692e59d58..20428fe269c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -44,6 +43,17 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.indexing.common.RetryPolicyFactory; +import com.metamx.druid.indexing.common.TaskToolboxFactory; +import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import com.metamx.druid.indexing.common.config.RetryPolicyConfig; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.index.ChatHandlerProvider; +import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; +import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; +import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; +import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; +import com.metamx.druid.indexing.worker.config.WorkerConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerInit; @@ -52,17 +62,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.S3DataSegmentKiller; -import com.metamx.druid.indexing.common.RetryPolicyFactory; -import com.metamx.druid.indexing.common.TaskToolboxFactory; -import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; -import com.metamx.druid.indexing.common.index.ChatHandlerProvider; -import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; -import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; -import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; -import com.metamx.druid.indexing.worker.config.WorkerConfig; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -70,11 +69,10 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceProvider; import org.jets3t.service.S3ServiceException; @@ -86,7 +84,6 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; -import java.util.List; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -350,8 +347,9 @@ public class ExecutorNode extends BaseServerNode { final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); if (serviceDiscovery == null) { + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), config, lifecycle + serviceDiscoveryCuratorFramework, config, lifecycle ); } if (serviceAnnouncer == null) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 1ccf32e0795..dda7afe7d78 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode { if (serviceDiscovery == null) { final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); - this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( config, getLifecycle() ); + this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( + serviceDiscoveryCuratorFramework, config, getLifecycle() + ); } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 63521717e79..c8edf93dbbf 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; @@ -124,10 +125,15 @@ public class MasterMain final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( serviceDiscoveryConfig, lifecycle ); + final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class); + CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + curatorConfig, + lifecycle + ); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); @@ -201,7 +207,7 @@ public class MasterMain final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, + serviceDiscoveryCuratorFramework, serviceDiscoveryConfig, lifecycle ); From af6db21264857017508f8c6308c050e6693a6b7d Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 17:04:27 -0700 Subject: [PATCH 2/2] fix configs based on code review --- .../java/com/metamx/druid/initialization/CuratorConfig.java | 2 +- .../com/metamx/druid/initialization/ServiceDiscoveryConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java index 741bc59d3d9..be46aea41f6 100644 --- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java @@ -33,7 +33,7 @@ public abstract class CuratorConfig @Default("30000") public abstract int getZkSessionTimeoutMs(); - @Config("druid.curator.compression.enable") + @Config("druid.curator.compress") @Default("false") public abstract boolean enableCompression(); } diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 2d828cac9fd..7d0c20cd7ef 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -38,7 +38,7 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig public abstract String getDiscoveryPath(); @Override - @Config("druid.service.discovery.curator.compression.enable") + @Config("druid.curator.discovery.compress") public boolean enableCompression() { return false;