cleaned up the code

This commit is contained in:
jisookim0513 2014-09-27 13:10:01 -07:00
parent aa887edb73
commit 74565c9371
42 changed files with 228 additions and 936 deletions

View File

@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;
@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
private final MetadataDbConnector dbConnector;
private final MetadataStorageConnector dbConnector;
private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec;
@ -58,7 +58,7 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
public ConfigManager(MetadataDbConnector dbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
public ConfigManager(MetadataStorageConnector dbConnector, Supplier<MetadataStorageTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.dbConnector = dbConnector;
this.config = config;

View File

@ -23,9 +23,8 @@ package io.druid.db;
*/
public interface MetadataStorageConnector
{
public Void insertOrUpdate(
final String storageName,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
@ -34,7 +33,7 @@ public interface MetadataStorageConnector
public byte[] lookup(
final String storageName,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key

View File

@ -27,8 +27,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import io.druid.common.config.ConfigManager;
import io.druid.common.config.ConfigManagerConfig;
import io.druid.common.config.JacksonConfigManager;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
/**
*/
@ -43,8 +43,8 @@ public class JacksonConfigManagerModule implements Module
@Provides @ManageLifecycle
public ConfigManager getConfigManager(
final MetadataDbConnector dbConnector,
final Supplier<MetadataTablesConfig> dbTables,
final MetadataStorageConnector dbConnector,
final Supplier<MetadataStorageTablesConfig> dbTables,
final Supplier<ConfigManagerConfig> config,
final Lifecycle lifecycle
)

View File

@ -34,8 +34,8 @@ public interface IndexerMetadataStorageCoordinator
throws IOException;
/**
* Attempts to insert a set of segments to the database. Returns the set of segments actually added (segments
* with identifiers already in the database will not be added).
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
* with identifiers already in the metadata storage will not be added).
*
* @param segments set of segments to add
* @return set of segments actually added

View File

@ -24,12 +24,10 @@ import java.util.Map;
public interface MetadataStorageActionHandler
{
//<T> T retryCall(Action<T> action);
/* Insert stuff. @returns number of entries inserted on success */
public void insert(
String tableName,
String Id,
String id,
String createdDate,
String dataSource,
byte[] payload,
@ -68,8 +66,4 @@ public interface MetadataStorageActionHandler
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(String tableName, String Id);
/* Initialize and return new DbConnector */
// fpublic MetadataDbConnector getConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables);
}

View File

@ -1,15 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>druid-postgres-storage</artifactId>
</project>

View File

@ -32,7 +32,7 @@ public class HadoopDruidIndexerJob implements Jobby
{
private static final Logger log = new Logger(HadoopDruidIndexerJob.class);
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob;
private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
@ -45,9 +45,9 @@ public class HadoopDruidIndexerJob implements Jobby
this.config = config;
if (config.isUpdaterJobSpecSet()) {
dbUpdaterJob = new DbUpdaterJob(config);
metadataStorageUpdaterJob = new MetadataStorageUpdaterJob(config);
} else {
dbUpdaterJob = null;
metadataStorageUpdaterJob = null;
}
}
@ -60,8 +60,8 @@ public class HadoopDruidIndexerJob implements Jobby
indexJob = new IndexGeneratorJob(config);
jobs.add(indexJob);
if (dbUpdaterJob != null) {
jobs.add(dbUpdaterJob);
if (metadataStorageUpdaterJob != null) {
jobs.add(metadataStorageUpdaterJob);
} else {
log.info("No updaterJobSpec set, not uploading to database");
}

View File

@ -21,7 +21,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.segment.indexing.IOConfig;
import java.util.Map;
@ -32,12 +32,12 @@ import java.util.Map;
public class HadoopIOConfig implements IOConfig
{
private final Map<String, Object> pathSpec;
private final DbUpdaterJobSpec metadataUpdateSpec;
private final MetadataStorageUpdaterJobSpec metadataUpdateSpec;
private final String segmentOutputPath;
public HadoopIOConfig(
final @JsonProperty("inputSpec") Map<String, Object> pathSpec,
final @JsonProperty("metadataUpdateSpec") DbUpdaterJobSpec metadataUpdateSpec,
final @JsonProperty("metadataUpdateSpec") MetadataStorageUpdaterJobSpec metadataUpdateSpec,
final @JsonProperty("segmentOutputPath") String segmentOutputPath
)
{
@ -53,7 +53,7 @@ public class HadoopIOConfig implements IOConfig
}
@JsonProperty("metadataUpdateSpec")
public DbUpdaterJobSpec getMetadataUpdateSpec()
public MetadataStorageUpdaterJobSpec getMetadataUpdateSpec()
{
return metadataUpdateSpec;
}

View File

@ -30,7 +30,7 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
@ -70,7 +70,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("updaterJobSpec") MetadataStorageUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,

View File

@ -29,8 +29,8 @@ import java.util.List;
*/
public class MetadataStorageUpdaterJob implements Jobby
{
private final HadoopDruidIndexerConfig config;
@Inject
private MetadataStorageUpdaterJobHandler handler;
@ -46,8 +46,7 @@ public class MetadataStorageUpdaterJob implements Jobby
{
final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
final ObjectMapper mapper = HadoopDruidIndexerConfig.jsonMapper;
handler.publishSegments(segmentTable, segments, mapper);
handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.jsonMapper);
return true;
}

View File

@ -22,11 +22,11 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
@ -354,8 +354,8 @@ public class HadoopIngestionSpecTest
HadoopIngestionSpec.class
);
final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final MetadataDbConnectorConfig connectorConfig = spec.get();
final MetadataStorageUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final MetadataStorageConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());

View File

@ -27,7 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;
@ -37,13 +37,13 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final IndexerMetadataCoordinator indexerDBCoordinator;
private final IndexerMetadataStorageCoordinator indexerDBCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerMetadataCoordinator indexerDBCoordinator,
IndexerMetadataStorageCoordinator indexerDBCoordinator,
ServiceEmitter emitter
)
{
@ -57,7 +57,7 @@ public class TaskActionToolbox
return taskLockbox;
}
public IndexerMetadataCoordinator getIndexerDBCoordinator()
public IndexerMetadataStorageCoordinator getIndexerDBCoordinator()
{
return indexerDBCoordinator;
}

View File

@ -32,8 +32,8 @@ import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
@ -47,24 +47,24 @@ import java.util.Map;
public class DbTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private final MetadataDbConnector metadataDbConnector;
private final MetadataTablesConfig dbTables;
private final MetadataStorageConnector metadataStorageConnector;
private final MetadataStorageTablesConfig dbTables;
private final TaskStorageConfig config;
private final MetadataActionHandler handler;
private final MetadataStorageActionHandler handler;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
public DbTaskStorage(
final ObjectMapper jsonMapper,
final MetadataDbConnector metadataDbConnector,
final MetadataTablesConfig dbTables,
final MetadataStorageConnector metadataStorageConnector,
final MetadataStorageTablesConfig dbTables,
final TaskStorageConfig config,
final MetadataActionHandler handler
final MetadataStorageActionHandler handler
)
{
this.jsonMapper = jsonMapper;
this.metadataDbConnector = metadataDbConnector;
this.metadataStorageConnector = metadataStorageConnector;
this.dbTables = dbTables;
this.config = config;
this.handler = handler;
@ -73,7 +73,7 @@ public class DbTaskStorage implements TaskStorage
@LifecycleStart
public void start()
{
metadataDbConnector.createTaskTables();
metadataStorageConnector.createTaskTables();
}
@LifecycleStop

View File

@ -42,7 +42,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
@ -101,7 +101,7 @@ public class TaskLifecycleTest
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private MockIndexerDBStorageCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
TaskStorageQueryAdapter tsqa = null;
@ -510,12 +510,12 @@ public class TaskLifecycleTest
return retVal;
}
private static class MockIndexerDBCoordinator extends IndexerSQLMetadataCoordinator
private static class MockIndexerDBStorageCoordinator extends IndexerSQLMetadataStorageCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private MockIndexerDBCoordinator()
private MockIndexerDBStorageCoordinator()
{
super(null, null, null);
}
@ -562,9 +562,9 @@ public class TaskLifecycleTest
}
}
private static MockIndexerDBCoordinator newMockMDC()
private static MockIndexerDBStorageCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
return new MockIndexerDBStorageCoordinator();
}
private static ServiceEmitter newMockEmitter()

View File

@ -1,15 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>mysql</artifactId>
</project>

View File

@ -1,179 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.mysql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
import java.util.Map;
public class MySQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(MySQLConnector.class);
private final DBI dbi;
@Inject
public MySQLConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
super(config, dbTables);
this.dbi = new DBI(getDatasource());
}
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, "
+ "start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
tableName
)
);
}
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
tableName
)
);
}
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
tableName
)
);
}
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
{
return String.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
}
public DBI getDBI() { return dbi; }
}

View File

@ -1,196 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.mysql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataStorageActionHandler;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import io.druid.storage.jdbc.mysql.MySQLConnector;
import io.druid.storage.jdbc.postgresql.PostgreSQLConnector;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
// TODO: change it to JDBC
public class MySQLMetadataStorageModule implements DruidModule
{
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindMySQL(binder);
bindPostgres(binder);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
}
private void bindMySQL(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("mysql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("mysql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
}
private void bindPostgres(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("postgresql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final SQLMetadataConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -56,11 +56,12 @@
<module>cassandra-storage</module>
<module>hdfs-storage</module>
<module>s3-extensions</module>
<module>jdbc-storage</module>
<module>kafka-seven</module>
<module>kafka-eight</module>
<module>rabbitmq</module>
<module>histogram</module>
<module>mysql-storage</module>
<module>postgres-storage</module>
</modules>
<dependencyManagement>

View File

@ -1,15 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>postgres</artifactId>
</project>

View File

@ -1,186 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.postgres;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
import java.util.Map;
public class PostgreSQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(PostgreSQLConnector.class);
private final DBI dbi;
@Inject
public PostgreSQLConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
super(config, dbTables);
this.dbi = new DBI(getDatasource());
}
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, "
+ "start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);",
tableName
)
);
}
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);",
tableName
)
);
}
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);",
tableName
)
);
}
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
{
return String.format(
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;",
tableName, keyColumn, valueColumn
);
}
public DBI getDBI() { return dbi; }
}

View File

@ -1,191 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.postgres;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataStorageActionHandler;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import io.druid.storage.jdbc.mysql.MySQLConnector;
import io.druid.storage.jdbc.postgresql.PostgreSQLConnector;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
// TODO: change it to JDBC
public class PostgresMetadataStorageModule implements DruidModule
{
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindMySQL(binder);
bindPostgres(binder);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
}
private void bindMySQL(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("mysql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("mysql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
}
private void bindPostgres(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("postgresql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final SQLMetadataConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -41,7 +41,7 @@ public class DerbyConnector extends SQLMetadataConnector
private final DBI dbi;
@Inject
public DerbyConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables)
public DerbyConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
super(config, dbTables);
this.dbi = new DBI(getConnectionFactory("druidDerbyDb"));
@ -187,7 +187,7 @@ public class DerbyConnector extends SQLMetadataConnector
@Override
public Void insertOrUpdate(
final String storageName,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
@ -204,15 +204,17 @@ public class DerbyConnector extends SQLMetadataConnector
handle.begin();
conn.setAutoCommit(false);
List<Map<String, Object>> entry = handle.createQuery(
String.format("SELECT * FROM %1$s WHERE %2$s=:key", storageName, keyColumn)
String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn)
).list();
if (entry == null || entry.isEmpty()) {
handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)", storageName, keyColumn, valueColumn))
handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
} else {
handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key", storageName, keyColumn, valueColumn))
handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();

View File

@ -38,7 +38,7 @@ public class DerbyMetadataRuleManager extends SQLMetadataRuleManager
public DerbyMetadataRuleManager(
@Json ObjectMapper jsonMapper,
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
Supplier<MetadataStorageTablesConfig> dbTables,
IDBI dbi
) {
super(jsonMapper, config, dbTables, dbi);

View File

@ -32,8 +32,8 @@ public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProv
{
private final ObjectMapper jsonMapper;
private final Supplier<MetadataRuleManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final MetadataDbConnector dbConnector;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final MetadataStorageConnector dbConnector;
private final Lifecycle lifecycle;
private final IDBI dbi;
@ -41,8 +41,8 @@ public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProv
public DerbyMetadataRuleManagerProvider(
ObjectMapper jsonMapper,
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
MetadataDbConnector dbConnector,
Supplier<MetadataStorageTablesConfig> dbTables,
MetadataStorageConnector dbConnector,
IDBI dbi,
Lifecycle lifecycle
)

View File

@ -19,13 +19,10 @@
package io.druid.db;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
@ -36,7 +33,6 @@ import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -52,7 +48,6 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
@ -74,7 +69,7 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi;
@ -86,7 +81,7 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager
public DerbyMetadataSegmentManager(
ObjectMapper jsonMapper,
Supplier<MetadataSegmentManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
Supplier<MetadataStorageTablesConfig> dbTables,
IDBI dbi
)
{

View File

@ -24,15 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;
public class DerbyMetadataSegmentManagerProvider implements MetadataSegmentManagerProvider
{
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final MetadataDbConnector dbConnector;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final MetadataStorageConnector dbConnector;
private final IDBI dbi;
private final Lifecycle lifecycle;
@ -40,8 +39,8 @@ public class DerbyMetadataSegmentManagerProvider implements MetadataSegmentManag
public DerbyMetadataSegmentManagerProvider(
ObjectMapper jsonMapper,
Supplier<MetadataSegmentManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
MetadataDbConnector dbConnector,
Supplier<MetadataStorageTablesConfig> dbTables,
MetadataStorageConnector dbConnector,
IDBI dbi,
Lifecycle lifecycle
)

View File

@ -34,12 +34,12 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
public abstract class SQLMetadataConnector implements MetadataDbConnector
public abstract class SQLMetadataConnector implements MetadataStorageConnector
{
private final Supplier<MetadataDbConnectorConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final Supplier<MetadataStorageConnectorConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables;
protected SQLMetadataConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables)
protected SQLMetadataConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
this.config = config;
this.dbTables = dbTables;
@ -88,23 +88,23 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector
@Override
public void createTaskTables() {
if (config.get().isCreateTables()) {
final MetadataTablesConfig metadataTablesConfig = dbTables.get();
createTaskTable(getDBI(), metadataTablesConfig.getTasksTable());
createTaskLogTable(getDBI(), metadataTablesConfig.getTaskLogTable());
createTaskLockTable(getDBI(), metadataTablesConfig.getTaskLockTable());
final MetadataStorageTablesConfig metadataStorageTablesConfig = dbTables.get();
createTaskTable(getDBI(), metadataStorageTablesConfig.getTasksTable());
createTaskLogTable(getDBI(), metadataStorageTablesConfig.getTaskLogTable());
createTaskLockTable(getDBI(), metadataStorageTablesConfig.getTaskLockTable());
}
}
@Override
public Void insertOrUpdate(
final String storageName,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
final String insertOrUpdateStatement = insertOrUpdateStatement(storageName, keyColumn, valueColumn);
final String insertOrUpdateStatement = insertOrUpdateStatement(tableName, keyColumn, valueColumn);
return getDBI().withHandle(
new HandleCallback<Void>()
@ -124,14 +124,14 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector
@Override
public byte[] lookup(
final String storageName,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn,
storageName, keyColumn);
tableName, keyColumn);
return getDBI().withHandle(
new HandleCallback<byte[]>()
@ -167,11 +167,11 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector
);
}
public MetadataDbConnectorConfig getConfig() { return config.get(); }
public MetadataStorageConnectorConfig getConfig() { return config.get(); }
protected DataSource getDatasource()
{
MetadataDbConnectorConfig connectorConfig = config.get();
MetadataStorageConnectorConfig connectorConfig = config.get();
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());

View File

@ -33,7 +33,6 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.mysql.jdbc.Clob;
import io.druid.client.DruidServer;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
@ -124,7 +123,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
private final ObjectMapper jsonMapper;
private final Supplier<MetadataRuleManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final IDBI dbi;
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
@ -138,7 +137,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
public SQLMetadataRuleManager(
@Json ObjectMapper jsonMapper,
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
Supplier<MetadataStorageTablesConfig> dbTables,
IDBI dbi
)
{

View File

@ -32,8 +32,8 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
{
private final ObjectMapper jsonMapper;
private final Supplier<MetadataRuleManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final MetadataDbConnector dbConnector;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final MetadataStorageConnector dbConnector;
private final Lifecycle lifecycle;
private final IDBI dbi;
@ -41,8 +41,8 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
public SQLMetadataRuleManagerProvider(
ObjectMapper jsonMapper,
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
MetadataDbConnector dbConnector,
Supplier<MetadataStorageTablesConfig> dbTables,
MetadataStorageConnector dbConnector,
IDBI dbi,
Lifecycle lifecycle
)

View File

@ -19,13 +19,10 @@
package io.druid.db;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
@ -36,7 +33,6 @@ import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -52,11 +48,9 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -74,7 +68,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi;
@ -86,7 +80,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public SQLMetadataSegmentManager(
ObjectMapper jsonMapper,
Supplier<MetadataSegmentManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
Supplier<MetadataStorageTablesConfig> dbTables,
IDBI dbi
)
{

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;
@ -32,8 +31,8 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager
{
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataTablesConfig> dbTables;
private final MetadataDbConnector dbConnector;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final MetadataStorageConnector dbConnector;
private final IDBI dbi;
private final Lifecycle lifecycle;
@ -41,8 +40,8 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager
public SQLMetadataSegmentManagerProvider(
ObjectMapper jsonMapper,
Supplier<MetadataSegmentManagerConfig> config,
Supplier<MetadataTablesConfig> dbTables,
MetadataDbConnector dbConnector,
Supplier<MetadataStorageTablesConfig> dbTables,
MetadataStorageConnector dbConnector,
IDBI dbi,
Lifecycle lifecycle
)

View File

@ -41,14 +41,14 @@ public class SQLMetadataSegmentPublisher implements DbSegmentPublisher
private static final Logger log = new Logger(SQLMetadataSegmentPublisher.class);
private final ObjectMapper jsonMapper;
private final MetadataTablesConfig config;
private final MetadataStorageTablesConfig config;
private final IDBI dbi;
private final String statement;
@Inject
public SQLMetadataSegmentPublisher(
ObjectMapper jsonMapper,
MetadataTablesConfig config,
MetadataStorageTablesConfig config,
IDBI dbi
)
{

View File

@ -36,7 +36,7 @@ public class SQLMetadataSegmentPublisherProvider implements MetadataSegmentPubli
@JacksonInject
@NotNull
private MetadataTablesConfig config = null;
private MetadataStorageTablesConfig config = null;
@JacksonInject
@NotNull

View File

@ -53,7 +53,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
/* Insert stuff. @returns number of entries inserted on success */
public void insert(
final String tableName,
final String Id,
final String id,
final String createdDate,
final String dataSource,
final byte[] payload,
@ -73,7 +73,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan
tableName
)
)
.bind("id", Id)
.bind("id", id)
.bind("created_date", createdDate)
.bind("datasource", dataSource)
.bind("payload", payload)

View File

@ -21,18 +21,18 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataRuleManagerConfig;
import io.druid.db.MetadataSegmentManagerConfig;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataStorageTablesConfig;
public class MetadataDbConfigModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataDbConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);

View File

@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DerbyStorageDruidModule;
import io.druid.guice.DerbyMetadataStorageDruidModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.ExtensionsConfig;
@ -334,7 +334,7 @@ public class Initialization
new DiscoveryModule(),
new ServerViewModule(),
new MetadataDbConfigModule(),
new DerbyStorageDruidModule(),
new DerbyMetadataStorageDruidModule(),
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new LocalDataStorageDruidModule(),

View File

@ -1,8 +1,116 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Created by jisookim on 9/26/14.
*/
public class MetadataSegmentManagerTest
{
private DerbyMetadataSegmentManager manager;
private IDBI dbi;
private List<Map<String, Object>> testRows;
@Before
public void setUp() throws Exception
{
dbi = EasyMock.createMock(IDBI.class);
manager = new DerbyMetadataSegmentManager(
new DefaultObjectMapper(),
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase("test")),
dbi
);
Map<String, Object> map1 = Maps.newHashMap();
map1.put("id", "wikipedia_editstream_2012-03-15T00:00:00.000Z_2012-03-16T00:00:00.000Z_2012-03-16T00:36:30.848Z");
map1.put("dataSource", "wikipedia_editstream");
map1.put("created_date", "2012-03-23T22:27:21.957Z");
map1.put("start", "2012-03-15T00:00:00.000Z");
map1.put("end", "2012-03-16T00:00:00.000Z");
map1.put("partitioned", 0);
map1.put("version", "2012-03-16T00:36:30.848Z");
map1.put("used", 1);
map1.put(
"payload", "{\"dataSource\":\"wikipedia_editstream\",\"interval\":"
+ "\"2012-03-15T00:00:00.000/2012-03-16T00:00:00.000\",\"version\":\"2012-03-16T00:36:30.848Z\""
+ ",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"metamx-kafka-data\",\"key\":"
+ "\"wikipedia-editstream/v3/beta-index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index"
+ ".zip\"},\"dimensions\":\"page,namespace,language,user,anonymous,robot,newPage,unpatrolled,"
+ "geo,continent_code,country_name,city,region_lookup,dma_code,area_code,network,postal_code\""
+ ",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},"
+ "\"size\":26355195,\"identifier\":\"wikipedia_editstream_2012-03-15T00:00:00.000Z_2012-03-16"
+ "T00:00:00.000Z_2012-03-16T00:36:30.848Z\"}"
);
Map<String, Object> map2 = Maps.newHashMap();
map2.put("id", "twitterstream_2012-01-05T00:00:00.000Z_2012-01-06T00:00:00.000Z_2012-01-06T22:19:12.565Z");
map2.put("dataSource", "twitterstream");
map2.put("created_date", "2012-03-23T22:27:21.988Z");
map2.put("start", "2012-01-05T00:00:00.000Z");
map2.put("end", "2012-01-06T00:00:00.000Z");
map2.put("partitioned", 0);
map2.put("version", "2012-01-06T22:19:12.565Z");
map2.put("used", 1);
map2.put(
"payload", "{\"dataSource\":\"twitterstream\",\"interval\":\"2012-01-05T00:00:00.000/2012-01-06T00:00:00.000\","
+ "\"version\":\"2012-01-06T22:19:12.565Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":"
+ "\"metamx-twitterstream\",\"key\":\"index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip\"}"
+ ",\"dimensions\":\"user_name,user_lang,user_time_zone,user_location,"
+ "user_mention_name,has_mention,reply_to_name,first_hashtag,rt_name,url_domain,has_links,"
+ "has_geo,is_retweet,is_viral\",\"metrics\":\"count,tweet_length,num_followers,num_links,"
+ "num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},"
+ "\"size\":511804455,\"identifier\":"
+ "\"twitterstream_2012-01-05T00:00:00.000Z_2012-01-06T00:00:00.000Z_2012-01-06T22:19:12.565Z\"}"
);
testRows = Arrays.<Map<String, Object>>asList(map1, map2);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(dbi);
}
@Test
public void testPoll()
{
EasyMock.expect(dbi.withHandle(EasyMock.<HandleCallback>anyObject())).andReturn(testRows).times(2);
EasyMock.replay(dbi);
manager.start();
manager.poll();
manager.stop();
}
}

View File

@ -31,7 +31,7 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
@ -53,7 +53,7 @@ import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.overlord.DbTaskStorage;
import io.druid.indexing.overlord.ForkingTaskRunnerFactory;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.RemoteTaskRunnerFactory;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskMaster;
@ -139,7 +139,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataCoordinator.class).to(IndexerSQLMetadataCoordinator.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)

View File

@ -31,7 +31,7 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.Binders;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys;
@ -50,7 +50,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
@ -183,7 +183,7 @@ public class CliPeon extends GuiceRunnable
// all of these bindings are so that we can run the peon in local mode
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataCoordinator.class).to(IndexerSQLMetadataCoordinator.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);
taskActionBinder.addBinding("remote")
.to(RemoteTaskActionClientFactory.class).in(LazySingleton.class);

View File

@ -27,9 +27,9 @@ import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
@ -71,7 +71,7 @@ public class CreateTables extends GuiceRunnable
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(MetadataDbConnectorConfig.class), new MetadataDbConnectorConfig()
binder, Key.get(MetadataStorageConnectorConfig.class), new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
@ -93,7 +93,7 @@ public class CreateTables extends GuiceRunnable
}
);
JsonConfigProvider.bindInstance(
binder, Key.get(MetadataTablesConfig.class), MetadataTablesConfig.fromBase(base)
binder, Key.get(MetadataStorageTablesConfig.class), MetadataStorageTablesConfig.fromBase(base)
);
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1)
@ -107,7 +107,7 @@ public class CreateTables extends GuiceRunnable
public void run()
{
final Injector injector = makeInjector();
MetadataDbConnector dbConnector = injector.getInstance(MetadataDbConnector.class);
MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class);
dbConnector.createSegmentTable();
dbConnector.createRulesTable();
dbConnector.createConfigTable();