toggle between compressed and non compressed service discovery

This commit is contained in:
fjy 2013-07-29 15:40:45 -07:00
parent b0090a1de6
commit 50836798fa
4 changed files with 33 additions and 20 deletions

View File

@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.zk.paths.discoveryPath") @Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath(); public abstract String getDiscoveryPath();
@Override
@Config("druid.service.discovery.curator.compression.enable")
public boolean enableCompression()
{
return false;
}
} }

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -44,6 +43,17 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServerInit;
@ -52,17 +62,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -70,11 +69,10 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
@ -86,7 +84,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -350,8 +347,9 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{ {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, lifecycle serviceDiscoveryCuratorFramework, config, lifecycle
); );
} }
if (serviceAnnouncer == null) { if (serviceAnnouncer == null) {

View File

@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode<WorkerNode>
{ {
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(
getCuratorFramework(),
config, config,
getLifecycle() getLifecycle()
); );
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, config, getLifecycle()
);
} }
if (coordinatorServiceProvider == null) { if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider( this.coordinatorServiceProvider = Initialization.makeServiceProvider(

View File

@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
@ -124,10 +125,15 @@ public class MasterMain
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework( CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig, serviceDiscoveryConfig,
lifecycle lifecycle
); );
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
curatorConfig,
lifecycle
);
final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
@ -201,7 +207,7 @@ public class MasterMain
final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryCuratorFramework,
serviceDiscoveryConfig, serviceDiscoveryConfig,
lifecycle lifecycle
); );