From 2f56c24259b0ea7743429956f0e90cdc8c067f79 Mon Sep 17 00:00:00 2001 From: cheddar Date: Fri, 7 Jun 2013 17:37:33 -0700 Subject: [PATCH] 1) Inject IndexingServiceClient 2) Switch all the DBI references to IDBI --- .../client/indexing/IndexingService.java | 17 +++++ .../indexing/IndexingServiceClient.java | 15 ++-- .../client/selector/DiscoverySelector.java | 8 +++ .../metamx/druid/client/selector/Server.java | 9 +++ .../druid/client/selector/ServerSelector.java | 3 +- .../druid/initialization/EmitterModule.java | 3 +- .../metamx/druid/config/ConfigManager.java | 4 +- .../druid/config/JacksonConfigManager.java | 2 + .../java/com/metamx/druid/db/DbConnector.java | 15 ++-- .../com/metamx/druid/guice/HttpModule.java | 40 ----------- .../metamx/druid/guice/LifecycleScope.java | 2 +- .../druid/guice/annotations/Global.java | 17 +++++ .../metamx/druid/indexer/DbUpdaterJob.java | 4 +- .../indexing/coordinator/DbTaskStorage.java | 9 ++- .../coordinator/MergerDBCoordinator.java | 6 +- .../http/IndexerCoordinatorNode.java | 4 +- .../druid/realtime/DbSegmentPublisher.java | 6 +- .../indexing/IndexingServiceSelector.java | 68 +++++++++++++++++++ .../metamx/druid/db/DatabaseRuleManager.java | 8 +-- .../druid/db/DatabaseSegmentManager.java | 6 +- .../metamx/druid/guice/HttpClientModule.java | 65 ++++++++++++++++++ .../com/metamx/druid/guice/MasterModule.java | 29 ++++++-- .../com/metamx/druid/http/MasterMain.java | 47 ++++--------- .../druid/db/DatabaseSegmentManagerTest.java | 6 +- 24 files changed, 268 insertions(+), 125 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/indexing/IndexingService.java create mode 100644 client/src/main/java/com/metamx/druid/client/selector/DiscoverySelector.java create mode 100644 client/src/main/java/com/metamx/druid/client/selector/Server.java delete mode 100644 common/src/main/java/com/metamx/druid/guice/HttpModule.java create mode 100644 common/src/main/java/com/metamx/druid/guice/annotations/Global.java create mode 100644 server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java create mode 100644 server/src/main/java/com/metamx/druid/guice/HttpClientModule.java diff --git a/client/src/main/java/com/metamx/druid/client/indexing/IndexingService.java b/client/src/main/java/com/metamx/druid/client/indexing/IndexingService.java new file mode 100644 index 00000000000..4198703ee63 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/indexing/IndexingService.java @@ -0,0 +1,17 @@ +package com.metamx.druid.client.indexing; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface IndexingService +{ +} diff --git a/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java index f4be29681c3..e3de082f7fc 100644 --- a/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java +++ b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java @@ -25,10 +25,11 @@ import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.selector.DiscoverySelector; +import com.metamx.druid.client.selector.Server; +import com.metamx.druid.guice.annotations.Global; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; import org.joda.time.Interval; import java.io.InputStream; @@ -42,13 +43,13 @@ public class IndexingServiceClient private final HttpClient client; private final ObjectMapper jsonMapper; - private final ServiceProvider serviceProvider; + private final DiscoverySelector serviceProvider; @Inject public IndexingServiceClient( - HttpClient client, + @Global HttpClient client, ObjectMapper jsonMapper, - ServiceProvider serviceProvider + @IndexingService DiscoverySelector serviceProvider ) { this.client = client; @@ -105,12 +106,12 @@ public class IndexingServiceClient private String baseUrl() { try { - final ServiceInstance instance = serviceProvider.getInstance(); + final Server instance = serviceProvider.pick(); if (instance == null) { throw new ISE("Cannot find instance of indexingService"); } - return String.format("http://%s:%s/druid/indexer/v1", instance.getAddress(), instance.getPort()); + return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/client/src/main/java/com/metamx/druid/client/selector/DiscoverySelector.java b/client/src/main/java/com/metamx/druid/client/selector/DiscoverySelector.java new file mode 100644 index 00000000000..1b55472e431 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/selector/DiscoverySelector.java @@ -0,0 +1,8 @@ +package com.metamx.druid.client.selector; + +/** + */ +public interface DiscoverySelector +{ + public T pick(); +} diff --git a/client/src/main/java/com/metamx/druid/client/selector/Server.java b/client/src/main/java/com/metamx/druid/client/selector/Server.java new file mode 100644 index 00000000000..9859c888239 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/selector/Server.java @@ -0,0 +1,9 @@ +package com.metamx.druid.client.selector; + +/** + */ +public interface Server +{ + public String getHost(); + public int getPort(); +} diff --git a/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java b/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java index 388761aaaa2..d4876e5680f 100644 --- a/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java +++ b/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java @@ -22,7 +22,6 @@ package com.metamx.druid.client.selector; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.client.DruidServer; import java.util.Collections; import java.util.Comparator; @@ -30,7 +29,7 @@ import java.util.Set; /** */ -public class ServerSelector +public class ServerSelector implements DiscoverySelector { private static final Comparator comparator = new Comparator() { diff --git a/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java b/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java index edc01e65a26..408358c2283 100644 --- a/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java +++ b/client/src/main/java/com/metamx/druid/initialization/EmitterModule.java @@ -32,7 +32,6 @@ import com.google.inject.name.Named; import com.google.inject.name.Names; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; -import com.metamx.druid.guice.DruidScopes; import com.metamx.druid.guice.LazySingleton; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitter; @@ -66,7 +65,7 @@ public class EmitterModule implements Module binder.install(new LogEmitterModule()); binder.install(new HttpEmitterModule()); - binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(DruidScopes.SINGLETON); + binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class); } @Provides diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManager.java b/common/src/main/java/com/metamx/druid/config/ConfigManager.java index 4aa97e7ffc7..14b504a60ec 100644 --- a/common/src/main/java/com/metamx/druid/config/ConfigManager.java +++ b/common/src/main/java/com/metamx/druid/config/ConfigManager.java @@ -2,6 +2,7 @@ package com.metamx.druid.config; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.inject.Inject; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -42,7 +43,8 @@ public class ConfigManager private volatile ConfigManager.PollingCallable poller; - public ConfigManager(IDBI dbi, ConfigManagerConfig config) + @Inject + public ConfigManager(IDBI dbi, ConfigManagerConfig config) // TODO: use DbTables and a different config { this.dbi = dbi; this.config = config; diff --git a/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java b/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java index 8e322f3ee80..13ed516fe62 100644 --- a/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java +++ b/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.inject.Inject; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; @@ -34,6 +35,7 @@ public class JacksonConfigManager private final ConfigManager configManager; private final ObjectMapper jsonMapper; + @Inject public JacksonConfigManager( ConfigManager configManager, ObjectMapper jsonMapper diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index d6b0b3e47f2..1cfc9904b09 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -24,6 +24,7 @@ import com.metamx.common.logger.Logger; import org.apache.commons.dbcp.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.sql.DataSource; @@ -36,7 +37,7 @@ public class DbConnector { private static final Logger log = new Logger(DbConnector.class); - public static void createSegmentTable(final DBI dbi, final String segmentTableName) + public static void createSegmentTable(final IDBI dbi, final String segmentTableName) { createTable( dbi, @@ -48,7 +49,7 @@ public class DbConnector ); } - public static void createRuleTable(final DBI dbi, final String ruleTableName) + public static void createRuleTable(final IDBI dbi, final String ruleTableName) { createTable( dbi, @@ -60,7 +61,7 @@ public class DbConnector ); } - public static void createConfigTable(final DBI dbi, final String configTableName) + public static void createConfigTable(final IDBI dbi, final String configTableName) { createTable( dbi, @@ -72,7 +73,7 @@ public class DbConnector ); } - public static void createTaskTable(final DBI dbi, final String taskTableName) + public static void createTaskTable(final IDBI dbi, final String taskTableName) { createTable( dbi, @@ -92,7 +93,7 @@ public class DbConnector ); } - public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName) + public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName) { createTable( dbi, @@ -110,7 +111,7 @@ public class DbConnector ); } - public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName) + public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName) { createTable( dbi, @@ -129,7 +130,7 @@ public class DbConnector } public static void createTable( - final DBI dbi, + final IDBI dbi, final String tableName, final String sql ) diff --git a/common/src/main/java/com/metamx/druid/guice/HttpModule.java b/common/src/main/java/com/metamx/druid/guice/HttpModule.java deleted file mode 100644 index 214acdf28cf..00000000000 --- a/common/src/main/java/com/metamx/druid/guice/HttpModule.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.metamx.druid.guice; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import sun.net.www.http.HttpClient; - -import javax.validation.constraints.Min; - -/** - */ -public class HttpModule implements Module -{ - - @Override - public void configure(Binder binder) - { - JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class); - } - - public abstract static class DruidHttpClientConfig - { - @JsonProperty - @Min(0) - private int numConnections = 5; - - public int getNumConnections() - { - return numConnections; - } - } - - @Provides @LazySingleton @ManageLifecycle - public HttpClient makeHttpClient(DruidHttpClientConfig config) - { - return null; // TODO - } - -} diff --git a/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java b/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java index 223113728e3..f9d8afad575 100644 --- a/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java +++ b/common/src/main/java/com/metamx/druid/guice/LifecycleScope.java @@ -34,7 +34,7 @@ public class LifecycleScope implements Scope { return new Provider() { - private T value = null; + private volatile T value = null; @Override public synchronized T get() diff --git a/common/src/main/java/com/metamx/druid/guice/annotations/Global.java b/common/src/main/java/com/metamx/druid/guice/annotations/Global.java new file mode 100644 index 00000000000..497e1d17c6c --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/annotations/Global.java @@ -0,0 +1,17 @@ +package com.metamx.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Global +{ +} diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java index 7e2efe33bb3..5140fee9180 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java @@ -26,8 +26,8 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnector; import com.metamx.druid.jackson.DefaultObjectMapper; import org.joda.time.DateTime; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.tweak.HandleCallback; @@ -42,7 +42,7 @@ public class DbUpdaterJob implements Jobby private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final HadoopDruidIndexerConfig config; - private final DBI dbi; + private final IDBI dbi; public DbUpdaterJob( HadoopDruidIndexerConfig config diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java index a647770b33b..a9c4ad7c60c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java @@ -28,16 +28,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.actions.TaskAction; -import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig; import com.metamx.emitter.EmittingLogger; - import org.joda.time.DateTime; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.StatementException; import org.skife.jdbi.v2.tweak.HandleCallback; @@ -48,11 +47,11 @@ public class DbTaskStorage implements TaskStorage { private final ObjectMapper jsonMapper; private final IndexerDbConnectorConfig dbConnectorConfig; - private final DBI dbi; + private final IDBI dbi; private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); - public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi) + public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, IDBI dbi) { this.jsonMapper = jsonMapper; this.dbConnectorConfig = dbConnectorConfig; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java index ea258269c84..3f893d79cdd 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java @@ -34,10 +34,10 @@ import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbTablesConfig; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; @@ -59,13 +59,13 @@ public class MergerDBCoordinator private final ObjectMapper jsonMapper; private final DbConnectorConfig dbConnectorConfig; private final DbTablesConfig dbTables; - private final DBI dbi; + private final IDBI dbi; public MergerDBCoordinator( ObjectMapper jsonMapper, DbConnectorConfig dbConnectorConfig, DbTablesConfig dbTables, - DBI dbi + IDBI dbi ) { this.jsonMapper = jsonMapper; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index 5510a3c19c8..c98497a3602 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -126,7 +126,7 @@ import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.resource.ResourceCollection; import org.skife.config.ConfigurationObjectFactory; -import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.IDBI; import java.io.IOException; import java.io.InputStream; @@ -153,7 +153,7 @@ public class IndexerCoordinatorNode extends QueryableNode monitors = null; private ServiceEmitter emitter = null; private DbConnectorConfig dbConnectorConfig = null; - private DBI dbi = null; + private IDBI dbi = null; private IndexerCoordinatorConfig config = null; private MergerDBCoordinator mergerDBCoordinator = null; private ServiceDiscovery serviceDiscovery = null; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java index 7a7e0e8ed7f..660e4bc7599 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import org.joda.time.DateTime; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.IOException; @@ -18,12 +18,12 @@ public class DbSegmentPublisher implements SegmentPublisher private final ObjectMapper jsonMapper; private final DbSegmentPublisherConfig config; - private final DBI dbi; + private final IDBI dbi; public DbSegmentPublisher( ObjectMapper jsonMapper, DbSegmentPublisherConfig config, - DBI dbi + IDBI dbi ) { this.jsonMapper = jsonMapper; diff --git a/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java new file mode 100644 index 00000000000..d6d07dcafe8 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/client/indexing/IndexingServiceSelector.java @@ -0,0 +1,68 @@ +package com.metamx.druid.client.indexing; + +import com.google.inject.Inject; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.selector.DiscoverySelector; +import com.metamx.druid.client.selector.Server; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; + +/** +*/ +public class IndexingServiceSelector implements DiscoverySelector +{ + private static final Logger log = new Logger(IndexingServiceSelector.class); + + private final ServiceProvider serviceProvider; + + @Inject + public IndexingServiceSelector( + @IndexingService ServiceProvider serviceProvider + ) { + this.serviceProvider = serviceProvider; + } + + @Override + public Server pick() + { + final ServiceInstance instance; + try { + instance = serviceProvider.getInstance(); + } + catch (Exception e) { + log.info(e, ""); + return null; + } + + return new Server() + { + @Override + public String getHost() + { + return instance.getAddress(); + } + + @Override + public int getPort() + { + return instance.getPort(); + } + }; + } + + @LifecycleStart + public void start() throws Exception + { + serviceProvider.start(); + } + + @LifecycleStop + public void stop() throws IOException + { + serviceProvider.close(); + } +} diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java index 4d51df1a8d5..5e8550b0165 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java @@ -38,10 +38,10 @@ import com.metamx.druid.master.rules.Rule; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; @@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; public class DatabaseRuleManager { public static void createDefaultRule( - final DBI dbi, + final IDBI dbi, final String ruleTable, final String defaultTier, final ObjectMapper jsonMapper @@ -120,7 +120,7 @@ public class DatabaseRuleManager private final ScheduledExecutorService exec; private final DatabaseRuleManagerConfig config; private final Supplier dbTables; - private final DBI dbi; + private final IDBI dbi; private final AtomicReference>> rules; private final Object lock = new Object(); @@ -132,7 +132,7 @@ public class DatabaseRuleManager ObjectMapper jsonMapper, DatabaseRuleManagerConfig config, Supplier dbTables, - DBI dbi + IDBI dbi ) { this.jsonMapper = jsonMapper; diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index b51976f79e3..7b34febefe5 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -41,10 +41,10 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.skife.jdbi.v2.Batch; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; @@ -72,7 +72,7 @@ public class DatabaseSegmentManager private final Supplier config; private final Supplier dbTables; private final AtomicReference> dataSources; - private final DBI dbi; + private final IDBI dbi; private volatile boolean started = false; @@ -81,7 +81,7 @@ public class DatabaseSegmentManager ObjectMapper jsonMapper, Supplier config, Supplier dbTables, - DBI dbi + IDBI dbi ) { this.jsonMapper = jsonMapper; diff --git a/server/src/main/java/com/metamx/druid/guice/HttpClientModule.java b/server/src/main/java/com/metamx/druid/guice/HttpClientModule.java new file mode 100644 index 00000000000..00ea9d01a7f --- /dev/null +++ b/server/src/main/java/com/metamx/druid/guice/HttpClientModule.java @@ -0,0 +1,65 @@ +package com.metamx.druid.guice; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.druid.guice.annotations.Global; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.validation.constraints.Min; + +/** + */ +public class HttpClientModule implements Module +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class); + } + + public abstract static class DruidHttpClientConfig + { + @JsonProperty + @Min(0) + private int numConnections = 5; + + @JsonProperty + private Period readTimeout = null; + + public int getNumConnections() + { + return numConnections; + } + + public Duration getReadTimeout() + { + return readTimeout.toStandardDuration(); + } + } + + @Provides @LazySingleton @Global + public HttpClient makeHttpClient(DruidHttpClientConfig config, Lifecycle lifecycle, @Nullable SSLContext sslContext) + { + final HttpClientConfig.Builder builder = HttpClientConfig + .builder() + .withNumConnections(config.getNumConnections()) + .withReadTimeout(config.getReadTimeout()); + + if (sslContext != null) { + builder.withSslContext(sslContext); + } + + return HttpClientInit.createClient(builder.build(), lifecycle); + } + + +} diff --git a/server/src/main/java/com/metamx/druid/guice/MasterModule.java b/server/src/main/java/com/metamx/druid/guice/MasterModule.java index 0d69a86d438..60d6ccbad6d 100644 --- a/server/src/main/java/com/metamx/druid/guice/MasterModule.java +++ b/server/src/main/java/com/metamx/druid/guice/MasterModule.java @@ -3,15 +3,22 @@ package com.metamx.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.druid.client.ServerInventoryViewConfig; +import com.metamx.druid.client.indexing.IndexingService; import com.metamx.druid.client.indexing.IndexingServiceClient; +import com.metamx.druid.client.indexing.IndexingServiceSelector; +import com.metamx.druid.client.selector.DiscoverySelector; +import com.metamx.druid.client.selector.Server; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbTablesConfig; import com.metamx.druid.initialization.ZkPathsConfig; -import com.metamx.http.client.HttpClient; -import org.skife.jdbi.v2.DBI; +import com.metamx.druid.master.DruidMasterConfig; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceProvider; +import org.skife.jdbi.v2.IDBI; /** */ @@ -26,17 +33,25 @@ public class MasterModule implements Module JsonConfigProvider.bind(binder, "druid.database.tables", DbTablesConfig.class); + binder.bind(new TypeLiteral>(){}) + .annotatedWith(IndexingService.class) + .to(IndexingServiceSelector.class) + .in(ManageLifecycle.class); + binder.bind(IndexingServiceClient.class).in(LazySingleton.class); } - @Provides - public IndexingServiceClient getIndexingServiceClient(HttpClient client) + @Provides @ManageLifecycle @IndexingService + public DiscoverySelector getIndexingServiceSelector(DruidMasterConfig config, ServiceDiscovery serviceDiscovery) { - // TODO - return null; + final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder() + .serviceName(config.getMergerServiceName()) + .build(); + + return new IndexingServiceSelector(serviceProvider); } @Provides @LazySingleton - public DBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle) + public IDBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle) { if (config.isCreateTables()) { lifecycle.addHandler( diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 74f7e52b465..12c2a87dc7f 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -22,12 +22,11 @@ package com.metamx.druid.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; -import com.google.inject.TypeLiteral; import com.google.inject.servlet.GuiceFilter; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; @@ -48,6 +47,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DbConnector; import com.metamx.druid.guice.DruidGuiceExtensions; import com.metamx.druid.guice.DruidSecondaryModule; +import com.metamx.druid.guice.HttpClientModule; import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.MasterModule; import com.metamx.druid.guice.ServerModule; @@ -67,8 +67,6 @@ import com.metamx.druid.metrics.MetricsModule; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.x.discovery.ServiceDiscovery; -import org.apache.curator.x.discovery.ServiceProvider; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; @@ -76,11 +74,12 @@ import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.servlet.GzipFilter; import org.skife.config.ConfigurationObjectFactory; -import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.IDBI; import javax.annotation.Nullable; import java.net.URL; import java.util.Arrays; +import java.util.List; /** */ @@ -93,9 +92,9 @@ public class MasterMain LogLevelAdjuster.register(); Injector injector = makeInjector( - DruidSecondaryModule.class, new LifecycleModule(Key.get(MonitorScheduler.class)), EmitterModule.class, + HttpClientModule.class, CuratorModule.class, MetricsModule.class, DiscoveryModule.class, @@ -124,22 +123,10 @@ public class MasterMain final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class); - final ServiceDiscovery serviceDiscovery = injector.getInstance(Key.get(new TypeLiteral>(){})); final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class); Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle); - ServiceProvider serviceProvider = null; - IndexingServiceClient indexingServiceClient = null; - if (druidMasterConfig.getMergerServiceName() != null) { - serviceProvider = Initialization.makeServiceProvider( - druidMasterConfig.getMergerServiceName(), - serviceDiscovery, - lifecycle - ); -// indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); TODO - } - - DBI dbi = injector.getInstance(DBI.class); + IDBI dbi = injector.getInstance(IDBI.class); // TODO: make tables and stuff final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class); DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable()); JacksonConfigManager configManager = new JacksonConfigManager( @@ -160,7 +147,7 @@ public class MasterMain curatorFramework, emitter, scheduledExecutorFactory, - indexingServiceClient, + injector.getInstance(IndexingServiceClient.class), taskMaster ); lifecycle.addManagedInstance(master); @@ -187,17 +174,6 @@ public class MasterMain ) ); - final Injector injector2 = Guice.createInjector( - new MasterServletModule( - serverInventoryView, - databaseSegmentManager, - databaseRuleManager, - master, - jsonMapper, - indexingServiceClient - ) - ); - final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class)); final RedirectInfo redirectInfo = new RedirectInfo() @@ -264,9 +240,14 @@ public class MasterMain new ConfigFactoryModule() ); + List actualModules = Lists.newArrayList(); + + actualModules.add(DruidSecondaryModule.class); + actualModules.addAll(Arrays.asList(modules)); + return Guice.createInjector( - Iterables.transform( - Arrays.asList(modules), + Lists.transform( + actualModules, new Function() { @Override diff --git a/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java b/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java index 223e55faa9d..0ccd2401c73 100644 --- a/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java +++ b/server/src/test/java/com/metamx/druid/db/DatabaseSegmentManagerTest.java @@ -26,7 +26,7 @@ import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import java.util.Arrays; @@ -38,13 +38,13 @@ import java.util.Map; public class DatabaseSegmentManagerTest { private DatabaseSegmentManager manager; - private DBI dbi; + private IDBI dbi; private List> testRows; @Before public void setUp() throws Exception { - dbi = EasyMock.createMock(DBI.class); + dbi = EasyMock.createMock(IDBI.class); manager = new DatabaseSegmentManager( new DefaultObjectMapper(), Suppliers.ofInstance(new DatabaseSegmentManagerConfig()),