1) Make tests work and continue trying to make the DruidMaster start up with just Guice

This commit is contained in:
cheddar 2013-06-07 12:01:46 -07:00
parent 9df458a065
commit f68df7ab69
20 changed files with 143 additions and 176 deletions

View File

@ -21,6 +21,7 @@ package com.metamx.druid.client.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
@ -43,6 +44,7 @@ public class IndexingServiceClient
private final ObjectMapper jsonMapper;
private final ServiceProvider serviceProvider;
@Inject
public IndexingServiceClient(
HttpClient client,
ObjectMapper jsonMapper,

View File

@ -0,0 +1,40 @@
package com.metamx.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import sun.net.www.http.HttpClient;
import javax.validation.constraints.Min;
/**
*/
public class HttpModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.global.http", DruidHttpClientConfig.class);
}
public abstract static class DruidHttpClientConfig
{
@JsonProperty
@Min(0)
private int numConnections = 5;
public int getNumConnections()
{
return numConnections;
}
}
@Provides @LazySingleton @ManageLifecycle
public HttpClient makeHttpClient(DruidHttpClientConfig config)
{
return null; // TODO
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
@ -43,7 +42,6 @@ public class DbUpdaterJob implements Jobby
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJobSpec spec;
private final DBI dbi;
public DbUpdaterJob(
@ -51,8 +49,7 @@ public class DbUpdaterJob implements Jobby
)
{
this.config = config;
this.spec = (DbUpdaterJobSpec) config.getUpdaterJobSpec();
this.dbi = new DbConnector(spec).getDBI();
this.dbi = new DbConnector(config.getUpdaterJobSpec(), null).getDBI();
}
@Override
@ -70,13 +67,13 @@ public class DbUpdaterJob implements Jobby
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable()
config.getUpdaterJobSpec().getSegmentTable()
)
);
for (final DataSegment segment : segments) {
batch.add(
new ImmutableMap.Builder()
new ImmutableMap.Builder<String, Object>()
.put("id", segment.getIdentifier())
.put("dataSource", segment.getDataSource())
.put("created_date", new DateTime().toString())

View File

@ -51,7 +51,7 @@ import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.path.PathSpec;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.indexer.updater.UpdaterJobSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.shard.ShardSpec;
@ -59,7 +59,6 @@ import com.metamx.druid.utils.JodaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
@ -178,7 +177,7 @@ public class HadoopDruidIndexerConfig
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
private volatile boolean overwriteFiles = false;
private volatile DataRollupSpec rollupSpec;
private volatile UpdaterJobSpec updaterJobSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile List<String> registererers = Lists.newArrayList();
@ -203,7 +202,7 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") UpdaterJobSpec updaterJobSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("registererers") List<String> registererers
)
@ -497,12 +496,12 @@ public class HadoopDruidIndexerConfig
}
@JsonProperty
public UpdaterJobSpec getUpdaterJobSpec()
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
public void setUpdaterJobSpec(UpdaterJobSpec updaterJobSpec)
public void setUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec)
{
this.updaterJobSpec = updaterJobSpec;
}

View File

@ -20,11 +20,12 @@
package com.metamx.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.metamx.druid.db.DbConnectorConfig;
/**
*/
public class DbUpdaterJobSpec extends DbConnectorConfig implements UpdaterJobSpec
public class DbUpdaterJobSpec implements Supplier<DbConnectorConfig>
{
@JsonProperty("connectURI")
public String connectURI;
@ -38,26 +39,33 @@ public class DbUpdaterJobSpec extends DbConnectorConfig implements UpdaterJobSpe
@JsonProperty("segmentTable")
public String segmentTable;
@Override
public String getDatabaseConnectURI()
{
return connectURI;
}
@Override
public String getDatabaseUser()
{
return user;
}
@Override
public String getDatabasePassword()
{
return password;
}
public String getSegmentTable()
{
return segmentTable;
}
@Override
public DbConnectorConfig get()
{
return new DbConnectorConfig()
{
@Override
public String getConnectURI()
{
return connectURI;
}
@Override
public String getUser()
{
return user;
}
@Override
public String getPassword()
{
return password;
}
};
}
}

View File

@ -1,34 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "zk", value = ZkUpdaterJobSpec.class),
@JsonSubTypes.Type(name = "db", value = DbUpdaterJobSpec.class)
})
public interface UpdaterJobSpec
{
}

View File

@ -1,50 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class ZkUpdaterJobSpec implements UpdaterJobSpec
{
@JsonProperty("zkHosts")
public String zkQuorum;
@JsonProperty("zkBasePath")
private String zkBasePath;
public ZkUpdaterJobSpec() {}
public String getZkQuorum()
{
return zkQuorum;
}
public String getZkBasePath()
{
return zkBasePath;
}
public boolean postToZk()
{
return !(zkQuorum == null || zkBasePath == null);
}
}

View File

@ -22,11 +22,11 @@ package com.metamx.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -367,12 +367,14 @@ public class HadoopDruidIndexerConfigTest
HadoopDruidIndexerConfig.class
);
final DbUpdaterJobSpec spec = (DbUpdaterJobSpec) cfg.getUpdaterJobSpec();
final DbUpdaterJobSpec spec = cfg.getUpdaterJobSpec();
final DbConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", spec.getDatabaseConnectURI());
Assert.assertEquals("rofl", spec.getDatabaseUser());
Assert.assertEquals("p4ssw0rd", spec.getDatabasePassword());
Assert.assertEquals(false, spec.useValidationQuery());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());
Assert.assertEquals("rofl", connectorConfig.getUser());
Assert.assertEquals("p4ssw0rd", connectorConfig.getPassword());
Assert.assertEquals(false, connectorConfig.isUseValidationQuery());
}
@Test

View File

@ -29,12 +29,12 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
import com.metamx.druid.indexer.HadoopDruidIndexerJob;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.utils.JodaUtils;
import org.joda.time.DateTime;
@ -51,7 +51,7 @@ public class HadoopIndexTask extends AbstractTask
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
* <p/>
* Here, we will ensure that the UpdaterJobSpec field of the config is set to null, such that the
* Here, we will ensure that the DbConnectorConfig field of the config is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.

View File

@ -31,6 +31,7 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.DBI;
@ -57,16 +58,19 @@ public class MergerDBCoordinator
private final ObjectMapper jsonMapper;
private final DbConnectorConfig dbConnectorConfig;
private final DbTablesConfig dbTables;
private final DBI dbi;
public MergerDBCoordinator(
ObjectMapper jsonMapper,
DbConnectorConfig dbConnectorConfig,
DbTablesConfig dbTables,
DBI dbi
)
{
this.jsonMapper = jsonMapper;
this.dbConnectorConfig = dbConnectorConfig;
this.dbTables = dbTables;
this.dbi = dbi;
}
@ -87,7 +91,7 @@ public class MergerDBCoordinator
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource",
dbConnectorConfig.getSegmentTable()
dbTables.getSegmentsTable()
)
)
.bind("dataSource", dataSource)
@ -170,7 +174,7 @@ public class MergerDBCoordinator
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbConnectorConfig.getSegmentTable()
dbTables.getSegmentsTable()
)
).bind(
"identifier",
@ -185,7 +189,7 @@ public class MergerDBCoordinator
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbConnectorConfig.getSegmentTable()
dbTables.getSegmentsTable()
)
)
.bind("id", segment.getIdentifier())
@ -230,7 +234,7 @@ public class MergerDBCoordinator
private void deleteSegment(final Handle handle, final DataSegment segment)
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable())
String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable())
).bind("id", segment.getIdentifier())
.execute();
}
@ -246,7 +250,7 @@ public class MergerDBCoordinator
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbConnectorConfig.getSegmentTable()
dbTables.getSegmentsTable()
)
)
.bind("dataSource", dataSource)

View File

@ -25,15 +25,15 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework;

View File

@ -24,7 +24,6 @@ import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
@ -56,11 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
@ -101,6 +95,11 @@ import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -108,7 +107,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
@ -319,7 +317,6 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
root.addFilter(
new FilterHolder(
new RedirectFilter(
new ToStringResponseHandler(Charsets.UTF_8),
new RedirectInfo()
{
@Override
@ -563,6 +560,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
mergerDBCoordinator = new MergerDBCoordinator(
getJsonMapper(),
dbConnectorConfig,
null, // TODO
dbi
);
}

View File

@ -43,6 +43,17 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.indexing.worker.config.WorkerConfig;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNodeConfig;
import com.metamx.druid.initialization.Initialization;
@ -52,17 +63,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;

View File

@ -36,20 +36,12 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.LockAcquireAction;
import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionToolbox;
@ -59,6 +51,12 @@ import com.metamx.druid.indexing.common.task.IndexTask;
import com.metamx.druid.indexing.common.task.KillTask;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
@ -417,7 +415,7 @@ public class TaskLifecycleTest
private MockMergerDBCoordinator()
{
super(null, null, null);
super(null, null, null, null);
}
@Override

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
@ -235,7 +236,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
segmentPublisher = new DbSegmentPublisher(
getJsonMapper(),
dbSegmentPublisherConfig,
new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
new DbConnector(Suppliers.ofInstance(getConfigFactory().build(DbConnectorConfig.class)), null).getDBI() // TODO
);
getLifecycle().addManagedInstance(segmentPublisher);
}

View File

@ -106,7 +106,7 @@ public class DatabaseSegmentManager
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.get().getPollDuration(),
config.get().getPollDuration().toStandardDuration(),
new Runnable()
{
@Override

View File

@ -20,16 +20,16 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
/**
*/
public class DatabaseSegmentManagerConfig
{
@JsonProperty
private Duration pollDuration = new Duration("PT1M");
private Period pollDuration = new Period("PT1M");
public Duration getPollDuration()
public Period getPollDuration()
{
return pollDuration;
}

View File

@ -5,10 +5,12 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.http.client.HttpClient;
import org.skife.jdbi.v2.DBI;
/**
@ -26,6 +28,13 @@ public class MasterModule implements Module
}
@Provides
public IndexingServiceClient getIndexingServiceClient(HttpClient client)
{
// TODO
return null;
}
@Provides @LazySingleton
public DBI getDbi(final DbConnector dbConnector, final DbConnectorConfig config, Lifecycle lifecycle)
{

View File

@ -20,7 +20,6 @@
package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -66,7 +65,6 @@ import com.metamx.druid.master.DruidMasterConfig;
import com.metamx.druid.master.LoadQueueTaskMaster;
import com.metamx.druid.metrics.MetricsModule;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.metamx.metrics.MonitorScheduler;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
@ -131,6 +129,7 @@ public class MasterMain
Initialization.announceDefaultService(nodeConfig, serviceAnnouncer, lifecycle);
ServiceProvider serviceProvider = null;
IndexingServiceClient indexingServiceClient = null;
if (druidMasterConfig.getMergerServiceName() != null) {
serviceProvider = Initialization.makeServiceProvider(
druidMasterConfig.getMergerServiceName(),
@ -139,7 +138,6 @@ public class MasterMain
);
// indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); TODO
}
IndexingServiceClient indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
DBI dbi = injector.getInstance(DBI.class);
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
@ -246,7 +244,6 @@ public class MasterMain
root.addFilter(
new FilterHolder(
new RedirectFilter(
new ToStringResponseHandler(Charsets.UTF_8),
redirectInfo
)
), "/*", 0

View File

@ -20,7 +20,6 @@
package com.metamx.druid.http;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.response.HttpResponseHandler;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
@ -39,15 +38,12 @@ public class RedirectFilter implements Filter
{
private static final Logger log = new Logger(RedirectFilter.class);
private final HttpResponseHandler<StringBuilder, String> responseHandler;
private final RedirectInfo redirectInfo;
public RedirectFilter(
HttpResponseHandler<StringBuilder, String> responseHandler,
RedirectInfo redirectInfo
)
{
this.responseHandler = responseHandler;
this.redirectInfo = redirectInfo;
}