mirror of https://github.com/apache/druid.git
1) Inject IndexingServiceClient
2) Switch all the DBI references to IDBI
This commit is contained in:
parent
f68df7ab69
commit
2f56c24259
|
@ -0,0 +1,17 @@
|
|||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface IndexingService
|
||||
{
|
||||
}
|
|
@ -25,10 +25,11 @@ import com.google.inject.Inject;
|
|||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.selector.DiscoverySelector;
|
||||
import com.metamx.druid.client.selector.Server;
|
||||
import com.metamx.druid.guice.annotations.Global;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
@ -42,13 +43,13 @@ public class IndexingServiceClient
|
|||
|
||||
private final HttpClient client;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ServiceProvider serviceProvider;
|
||||
private final DiscoverySelector<Server> serviceProvider;
|
||||
|
||||
@Inject
|
||||
public IndexingServiceClient(
|
||||
HttpClient client,
|
||||
@Global HttpClient client,
|
||||
ObjectMapper jsonMapper,
|
||||
ServiceProvider serviceProvider
|
||||
@IndexingService DiscoverySelector<Server> serviceProvider
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
|
@ -105,12 +106,12 @@ public class IndexingServiceClient
|
|||
private String baseUrl()
|
||||
{
|
||||
try {
|
||||
final ServiceInstance instance = serviceProvider.getInstance();
|
||||
final Server instance = serviceProvider.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexingService");
|
||||
}
|
||||
|
||||
return String.format("http://%s:%s/druid/indexer/v1", instance.getAddress(), instance.getPort());
|
||||
return String.format("http://%s:%s/druid/indexer/v1", instance.getHost(), instance.getPort());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package com.metamx.druid.client.selector;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface DiscoverySelector<T>
|
||||
{
|
||||
public T pick();
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.metamx.druid.client.selector;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface Server
|
||||
{
|
||||
public String getHost();
|
||||
public int getPort();
|
||||
}
|
|
@ -22,7 +22,6 @@ package com.metamx.druid.client.selector;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -30,7 +29,7 @@ import java.util.Set;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class ServerSelector
|
||||
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
||||
{
|
||||
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
|
||||
{
|
||||
|
|
|
@ -32,7 +32,6 @@ import com.google.inject.name.Named;
|
|||
import com.google.inject.name.Names;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.guice.DruidScopes;
|
||||
import com.metamx.druid.guice.LazySingleton;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitter;
|
||||
|
@ -66,7 +65,7 @@ public class EmitterModule implements Module
|
|||
binder.install(new LogEmitterModule());
|
||||
binder.install(new HttpEmitterModule());
|
||||
|
||||
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(DruidScopes.SINGLETON);
|
||||
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.metamx.druid.config;
|
|||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
|
@ -42,7 +43,8 @@ public class ConfigManager
|
|||
|
||||
private volatile ConfigManager.PollingCallable poller;
|
||||
|
||||
public ConfigManager(IDBI dbi, ConfigManagerConfig config)
|
||||
@Inject
|
||||
public ConfigManager(IDBI dbi, ConfigManagerConfig config) // TODO: use DbTables and a different config
|
||||
{
|
||||
this.dbi = dbi;
|
||||
this.config = config;
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -34,6 +35,7 @@ public class JacksonConfigManager
|
|||
private final ConfigManager configManager;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public JacksonConfigManager(
|
||||
ConfigManager configManager,
|
||||
ObjectMapper jsonMapper
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.metamx.common.logger.Logger;
|
|||
import org.apache.commons.dbcp.BasicDataSource;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
@ -36,7 +37,7 @@ public class DbConnector
|
|||
{
|
||||
private static final Logger log = new Logger(DbConnector.class);
|
||||
|
||||
public static void createSegmentTable(final DBI dbi, final String segmentTableName)
|
||||
public static void createSegmentTable(final IDBI dbi, final String segmentTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -48,7 +49,7 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createRuleTable(final DBI dbi, final String ruleTableName)
|
||||
public static void createRuleTable(final IDBI dbi, final String ruleTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -60,7 +61,7 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createConfigTable(final DBI dbi, final String configTableName)
|
||||
public static void createConfigTable(final IDBI dbi, final String configTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -72,7 +73,7 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createTaskTable(final DBI dbi, final String taskTableName)
|
||||
public static void createTaskTable(final IDBI dbi, final String taskTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -92,7 +93,7 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName)
|
||||
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -110,7 +111,7 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName)
|
||||
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
|
@ -129,7 +130,7 @@ public class DbConnector
|
|||
}
|
||||
|
||||
public static void createTable(
|
||||
final DBI dbi,
|
||||
final IDBI dbi,
|
||||
final String tableName,
|
||||
final String sql
|
||||
)
|
||||
|
|
|
@ -1,40 +0,0 @@
|
|||
package com.metamx.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import sun.net.www.http.HttpClient;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HttpModule implements Module
|
||||
{
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class);
|
||||
}
|
||||
|
||||
public abstract static class DruidHttpClientConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int numConnections = 5;
|
||||
|
||||
public int getNumConnections()
|
||||
{
|
||||
return numConnections;
|
||||
}
|
||||
}
|
||||
|
||||
@Provides @LazySingleton @ManageLifecycle
|
||||
public HttpClient makeHttpClient(DruidHttpClientConfig config)
|
||||
{
|
||||
return null; // TODO
|
||||
}
|
||||
|
||||
}
|
|
@ -34,7 +34,7 @@ public class LifecycleScope implements Scope
|
|||
{
|
||||
return new Provider<T>()
|
||||
{
|
||||
private T value = null;
|
||||
private volatile T value = null;
|
||||
|
||||
@Override
|
||||
public synchronized T get()
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package com.metamx.druid.guice.annotations;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface Global
|
||||
{
|
||||
}
|
|
@ -26,8 +26,8 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.PreparedBatch;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
|
@ -42,7 +42,7 @@ public class DbUpdaterJob implements Jobby
|
|||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
|
||||
public DbUpdaterJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
|
|
|
@ -28,16 +28,15 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.druid.indexing.common.TaskLock;
|
||||
import com.metamx.druid.indexing.common.TaskStatus;
|
||||
import com.metamx.druid.indexing.common.actions.TaskAction;
|
||||
import com.metamx.druid.indexing.common.TaskLock;
|
||||
import com.metamx.druid.indexing.common.task.Task;
|
||||
import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.exceptions.StatementException;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
|
@ -48,11 +47,11 @@ public class DbTaskStorage implements TaskStorage
|
|||
{
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final IndexerDbConnectorConfig dbConnectorConfig;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
|
||||
|
||||
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi)
|
||||
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, IDBI dbi)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.dbConnectorConfig = dbConnectorConfig;
|
||||
|
|
|
@ -34,10 +34,10 @@ import com.metamx.druid.db.DbConnectorConfig;
|
|||
import com.metamx.druid.db.DbTablesConfig;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.ResultIterator;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.TransactionCallback;
|
||||
|
@ -59,13 +59,13 @@ public class MergerDBCoordinator
|
|||
private final ObjectMapper jsonMapper;
|
||||
private final DbConnectorConfig dbConnectorConfig;
|
||||
private final DbTablesConfig dbTables;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
|
||||
public MergerDBCoordinator(
|
||||
ObjectMapper jsonMapper,
|
||||
DbConnectorConfig dbConnectorConfig,
|
||||
DbTablesConfig dbTables,
|
||||
DBI dbi
|
||||
IDBI dbi
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
|
|
@ -126,7 +126,7 @@ import org.mortbay.jetty.servlet.FilterHolder;
|
|||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.mortbay.resource.ResourceCollection;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -153,7 +153,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
private List<Monitor> monitors = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private DbConnectorConfig dbConnectorConfig = null;
|
||||
private DBI dbi = null;
|
||||
private IDBI dbi = null;
|
||||
private IndexerCoordinatorConfig config = null;
|
||||
private MergerDBCoordinator mergerDBCoordinator = null;
|
||||
private ServiceDiscovery<Void> serviceDiscovery = null;
|
||||
|
|
|
@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -18,12 +18,12 @@ public class DbSegmentPublisher implements SegmentPublisher
|
|||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final DbSegmentPublisherConfig config;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
|
||||
public DbSegmentPublisher(
|
||||
ObjectMapper jsonMapper,
|
||||
DbSegmentPublisherConfig config,
|
||||
DBI dbi
|
||||
IDBI dbi
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.selector.DiscoverySelector;
|
||||
import com.metamx.druid.client.selector.Server;
|
||||
import org.apache.curator.x.discovery.ServiceInstance;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class IndexingServiceSelector implements DiscoverySelector<Server>
|
||||
{
|
||||
private static final Logger log = new Logger(IndexingServiceSelector.class);
|
||||
|
||||
private final ServiceProvider serviceProvider;
|
||||
|
||||
@Inject
|
||||
public IndexingServiceSelector(
|
||||
@IndexingService ServiceProvider serviceProvider
|
||||
) {
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Server pick()
|
||||
{
|
||||
final ServiceInstance instance;
|
||||
try {
|
||||
instance = serviceProvider.getInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info(e, "");
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Server()
|
||||
{
|
||||
@Override
|
||||
public String getHost()
|
||||
{
|
||||
return instance.getAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort()
|
||||
{
|
||||
return instance.getPort();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start() throws Exception
|
||||
{
|
||||
serviceProvider.start();
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop() throws IOException
|
||||
{
|
||||
serviceProvider.close();
|
||||
}
|
||||
}
|
|
@ -38,10 +38,10 @@ import com.metamx.druid.master.rules.Rule;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
|
@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
public class DatabaseRuleManager
|
||||
{
|
||||
public static void createDefaultRule(
|
||||
final DBI dbi,
|
||||
final IDBI dbi,
|
||||
final String ruleTable,
|
||||
final String defaultTier,
|
||||
final ObjectMapper jsonMapper
|
||||
|
@ -120,7 +120,7 @@ public class DatabaseRuleManager
|
|||
private final ScheduledExecutorService exec;
|
||||
private final DatabaseRuleManagerConfig config;
|
||||
private final Supplier<DbTablesConfig> dbTables;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
@ -132,7 +132,7 @@ public class DatabaseRuleManager
|
|||
ObjectMapper jsonMapper,
|
||||
DatabaseRuleManagerConfig config,
|
||||
Supplier<DbTablesConfig> dbTables,
|
||||
DBI dbi
|
||||
IDBI dbi
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
|
|
@ -41,10 +41,10 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
import org.skife.jdbi.v2.Batch;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
|
@ -72,7 +72,7 @@ public class DatabaseSegmentManager
|
|||
private final Supplier<DatabaseSegmentManagerConfig> config;
|
||||
private final Supplier<DbTablesConfig> dbTables;
|
||||
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
|
||||
private final DBI dbi;
|
||||
private final IDBI dbi;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
|
@ -81,7 +81,7 @@ public class DatabaseSegmentManager
|
|||
ObjectMapper jsonMapper,
|
||||
Supplier<DatabaseSegmentManagerConfig> config,
|
||||
Supplier<DbTablesConfig> dbTables,
|
||||
DBI dbi
|
||||
IDBI dbi
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
package com.metamx.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.druid.guice.annotations.Global;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HttpClientModule implements Module
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class);
|
||||
}
|
||||
|
||||
public abstract static class DruidHttpClientConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int numConnections = 5;
|
||||
|
||||
@JsonProperty
|
||||
private Period readTimeout = null;
|
||||
|
||||
public int getNumConnections()
|
||||
{
|
||||
return numConnections;
|
||||
}
|
||||
|
||||
public Duration getReadTimeout()
|
||||
{
|
||||
return readTimeout.toStandardDuration();
|
||||
}
|
||||
}
|
||||
|
||||
@Provides @LazySingleton @Global
|
||||
public HttpClient makeHttpClient(DruidHttpClientConfig config, Lifecycle lifecycle, @Nullable SSLContext sslContext)
|
||||
{
|
||||
final HttpClientConfig.Builder builder = HttpClientConfig
|
||||
.builder()
|
||||
.withNumConnections(config.getNumConnections())
|
||||
.withReadTimeout(config.getReadTimeout());
|
||||
|
||||
if (sslContext != null) {
|
||||
builder.withSslContext(sslContext);
|
||||
}
|
||||
|
||||
return HttpClientInit.createClient(builder.build(), lifecycle);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -3,15 +3,22 @@ package com.metamx.druid.guice;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.druid.client.ServerInventoryViewConfig;
|
||||
import com.metamx.druid.client.indexing.IndexingService;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceSelector;
|
||||
import com.metamx.druid.client.selector.DiscoverySelector;
|
||||
import com.metamx.druid.client.selector.Server;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.db.DbTablesConfig;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import com.metamx.druid.master.DruidMasterConfig;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -26,17 +33,25 @@ public class MasterModule implements Module
|
|||
|
||||
JsonConfigProvider.bind(binder, "druid.database.tables", DbTablesConfig.class);
|
||||
|
||||
binder.bind(new TypeLiteral<DiscoverySelector<Server>>(){})
|
||||
.annotatedWith(IndexingService.class)
|
||||
.to(IndexingServiceSelector.class)
|
||||
.in(ManageLifecycle.class);
|
||||
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
public IndexingServiceClient getIndexingServiceClient(HttpClient client)
|
||||
@Provides @ManageLifecycle @IndexingService
|
||||
public DiscoverySelector<Server> getIndexingServiceSelector(DruidMasterConfig config, ServiceDiscovery serviceDiscovery)
|
||||
{
|
||||
// TODO
|
||||
return null;
|
||||
final ServiceProvider serviceProvider = serviceDiscovery.serviceProviderBuilder()
|
||||
.serviceName(config.getMergerServiceName())
|
||||
.build();
|
||||
|
||||
return new IndexingServiceSelector(serviceProvider);
|
||||
}
|
||||
|
||||
@Provides @LazySingleton
|
||||
public DBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle)
|
||||
public IDBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle)
|
||||
{
|
||||
if (config.isCreateTables()) {
|
||||
lifecycle.addHandler(
|
||||
|
|
|
@ -22,12 +22,11 @@ package com.metamx.druid.http;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
|
@ -48,6 +47,7 @@ import com.metamx.druid.db.DatabaseSegmentManager;
|
|||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.guice.DruidGuiceExtensions;
|
||||
import com.metamx.druid.guice.DruidSecondaryModule;
|
||||
import com.metamx.druid.guice.HttpClientModule;
|
||||
import com.metamx.druid.guice.LifecycleModule;
|
||||
import com.metamx.druid.guice.MasterModule;
|
||||
import com.metamx.druid.guice.ServerModule;
|
||||
|
@ -67,8 +67,6 @@ import com.metamx.druid.metrics.MetricsModule;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.apache.curator.x.discovery.ServiceProvider;
|
||||
import org.mortbay.jetty.Server;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.DefaultServlet;
|
||||
|
@ -76,11 +74,12 @@ import org.mortbay.jetty.servlet.FilterHolder;
|
|||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.mortbay.servlet.GzipFilter;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -93,9 +92,9 @@ public class MasterMain
|
|||
LogLevelAdjuster.register();
|
||||
|
||||
Injector injector = makeInjector(
|
||||
DruidSecondaryModule.class,
|
||||
new LifecycleModule(Key.get(MonitorScheduler.class)),
|
||||
EmitterModule.class,
|
||||
HttpClientModule.class,
|
||||
CuratorModule.class,
|
||||
MetricsModule.class,
|
||||
DiscoveryModule.class,
|
||||
|
@ -124,22 +123,10 @@ public class MasterMain
|
|||
final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class);
|
||||
final DruidNodeConfig nodeConfig = configFactory.build(DruidNodeConfig.class);
|
||||
|
||||
final ServiceDiscovery<Void> serviceDiscovery = injector.getInstance(Key.get(new TypeLiteral<ServiceDiscovery<Void>>(){}));
|
||||
final ServiceAnnouncer serviceAnnouncer = injector.getInstance(ServiceAnnouncer.class);
|
||||
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
|
||||
|
||||
ServiceProvider serviceProvider = null;
|
||||
IndexingServiceClient indexingServiceClient = null;
|
||||
if (druidMasterConfig.getMergerServiceName() != null) {
|
||||
serviceProvider = Initialization.makeServiceProvider(
|
||||
druidMasterConfig.getMergerServiceName(),
|
||||
serviceDiscovery,
|
||||
lifecycle
|
||||
);
|
||||
// indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); TODO
|
||||
}
|
||||
|
||||
DBI dbi = injector.getInstance(DBI.class);
|
||||
IDBI dbi = injector.getInstance(IDBI.class); // TODO: make tables and stuff
|
||||
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
|
||||
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
|
||||
JacksonConfigManager configManager = new JacksonConfigManager(
|
||||
|
@ -160,7 +147,7 @@ public class MasterMain
|
|||
curatorFramework,
|
||||
emitter,
|
||||
scheduledExecutorFactory,
|
||||
indexingServiceClient,
|
||||
injector.getInstance(IndexingServiceClient.class),
|
||||
taskMaster
|
||||
);
|
||||
lifecycle.addManagedInstance(master);
|
||||
|
@ -187,17 +174,6 @@ public class MasterMain
|
|||
)
|
||||
);
|
||||
|
||||
final Injector injector2 = Guice.createInjector(
|
||||
new MasterServletModule(
|
||||
serverInventoryView,
|
||||
databaseSegmentManager,
|
||||
databaseRuleManager,
|
||||
master,
|
||||
jsonMapper,
|
||||
indexingServiceClient
|
||||
)
|
||||
);
|
||||
|
||||
final Server server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
|
||||
|
||||
final RedirectInfo redirectInfo = new RedirectInfo()
|
||||
|
@ -264,9 +240,14 @@ public class MasterMain
|
|||
new ConfigFactoryModule()
|
||||
);
|
||||
|
||||
List<Object> actualModules = Lists.newArrayList();
|
||||
|
||||
actualModules.add(DruidSecondaryModule.class);
|
||||
actualModules.addAll(Arrays.asList(modules));
|
||||
|
||||
return Guice.createInjector(
|
||||
Iterables.transform(
|
||||
Arrays.asList(modules),
|
||||
Lists.transform(
|
||||
actualModules,
|
||||
new Function<Object, Module>()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.easymock.EasyMock;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -38,13 +38,13 @@ import java.util.Map;
|
|||
public class DatabaseSegmentManagerTest
|
||||
{
|
||||
private DatabaseSegmentManager manager;
|
||||
private DBI dbi;
|
||||
private IDBI dbi;
|
||||
private List<Map<String, Object>> testRows;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
dbi = EasyMock.createMock(DBI.class);
|
||||
dbi = EasyMock.createMock(IDBI.class);
|
||||
manager = new DatabaseSegmentManager(
|
||||
new DefaultObjectMapper(),
|
||||
Suppliers.ofInstance(new DatabaseSegmentManagerConfig()),
|
||||
|
|
Loading…
Reference in New Issue