diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java index aa8e9b5abfc..c846d6edce2 100644 --- a/common/src/main/java/io/druid/common/config/ConfigManager.java +++ b/common/src/main/java/io/druid/common/config/ConfigManager.java @@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; -import io.druid.db.MetadataDbConnector; -import io.druid.db.MetadataTablesConfig; +import io.druid.db.MetadataStorageConnector; +import io.druid.db.MetadataStorageTablesConfig; import org.joda.time.Duration; import java.util.Arrays; @@ -48,7 +48,7 @@ public class ConfigManager private final Object lock = new Object(); private boolean started = false; - private final MetadataDbConnector dbConnector; + private final MetadataStorageConnector dbConnector; private final Supplier config; private final ScheduledExecutorService exec; @@ -58,7 +58,7 @@ public class ConfigManager private volatile ConfigManager.PollingCallable poller; @Inject - public ConfigManager(MetadataDbConnector dbConnector, Supplier dbTables, Supplier config) + public ConfigManager(MetadataStorageConnector dbConnector, Supplier dbTables, Supplier config) { this.dbConnector = dbConnector; this.config = config; diff --git a/common/src/main/java/io/druid/db/MetadataStorageConnector.java b/common/src/main/java/io/druid/db/MetadataStorageConnector.java index 71d1d075050..5b1712d5054 100644 --- a/common/src/main/java/io/druid/db/MetadataStorageConnector.java +++ b/common/src/main/java/io/druid/db/MetadataStorageConnector.java @@ -23,9 +23,8 @@ package io.druid.db; */ public interface MetadataStorageConnector { - public Void insertOrUpdate( - final String storageName, + final String tableName, final String keyColumn, final String valueColumn, final String key, @@ -34,7 +33,7 @@ public interface MetadataStorageConnector public byte[] lookup( - final String storageName, + final String tableName, final String keyColumn, final String valueColumn, final String key diff --git a/common/src/main/java/io/druid/guice/JacksonConfigManagerModule.java b/common/src/main/java/io/druid/guice/JacksonConfigManagerModule.java index de02e70a9a3..52f49eb34b1 100644 --- a/common/src/main/java/io/druid/guice/JacksonConfigManagerModule.java +++ b/common/src/main/java/io/druid/guice/JacksonConfigManagerModule.java @@ -27,8 +27,8 @@ import com.metamx.common.lifecycle.Lifecycle; import io.druid.common.config.ConfigManager; import io.druid.common.config.ConfigManagerConfig; import io.druid.common.config.JacksonConfigManager; -import io.druid.db.MetadataDbConnector; -import io.druid.db.MetadataTablesConfig; +import io.druid.db.MetadataStorageConnector; +import io.druid.db.MetadataStorageTablesConfig; /** */ @@ -43,8 +43,8 @@ public class JacksonConfigManagerModule implements Module @Provides @ManageLifecycle public ConfigManager getConfigManager( - final MetadataDbConnector dbConnector, - final Supplier dbTables, + final MetadataStorageConnector dbConnector, + final Supplier dbTables, final Supplier config, final Lifecycle lifecycle ) diff --git a/common/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/common/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index d344842f8cb..146b785347d 100644 --- a/common/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/common/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -34,8 +34,8 @@ public interface IndexerMetadataStorageCoordinator throws IOException; /** - * Attempts to insert a set of segments to the database. Returns the set of segments actually added (segments - * with identifiers already in the database will not be added). + * Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments + * with identifiers already in the metadata storage will not be added). * * @param segments set of segments to add * @return set of segments actually added diff --git a/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java index 603e6245284..bb7fe9bb540 100644 --- a/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java +++ b/common/src/main/java/io/druid/indexing/overlord/MetadataStorageActionHandler.java @@ -24,12 +24,10 @@ import java.util.Map; public interface MetadataStorageActionHandler { - // T retryCall(Action action); - /* Insert stuff. @returns number of entries inserted on success */ public void insert( String tableName, - String Id, + String id, String createdDate, String dataSource, byte[] payload, @@ -68,8 +66,4 @@ public interface MetadataStorageActionHandler /* Get locks for task with given ID */ public List> getTaskLocks(String tableName, String Id); - - /* Initialize and return new DbConnector */ - // fpublic MetadataDbConnector getConnector(Supplier config, Supplier dbTables); - } diff --git a/druid-postgres-storage/pom.xml b/druid-postgres-storage/pom.xml deleted file mode 100644 index 236ff1dadea..00000000000 --- a/druid-postgres-storage/pom.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - druid - io.druid - 0.7.0-SNAPSHOT - - 4.0.0 - - druid-postgres-storage - - - \ No newline at end of file diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index 64bc1267146..2f63690e2d9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -32,7 +32,7 @@ public class HadoopDruidIndexerJob implements Jobby { private static final Logger log = new Logger(HadoopDruidIndexerJob.class); private final HadoopDruidIndexerConfig config; - private final DbUpdaterJob dbUpdaterJob; + private final MetadataStorageUpdaterJob metadataStorageUpdaterJob; private IndexGeneratorJob indexJob; private volatile List publishedSegments = null; @@ -45,9 +45,9 @@ public class HadoopDruidIndexerJob implements Jobby this.config = config; if (config.isUpdaterJobSpecSet()) { - dbUpdaterJob = new DbUpdaterJob(config); + metadataStorageUpdaterJob = new MetadataStorageUpdaterJob(config); } else { - dbUpdaterJob = null; + metadataStorageUpdaterJob = null; } } @@ -60,8 +60,8 @@ public class HadoopDruidIndexerJob implements Jobby indexJob = new IndexGeneratorJob(config); jobs.add(indexJob); - if (dbUpdaterJob != null) { - jobs.add(dbUpdaterJob); + if (metadataStorageUpdaterJob != null) { + jobs.add(metadataStorageUpdaterJob); } else { log.info("No updaterJobSpec set, not uploading to database"); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java index 27a4866c56d..3aca5b6136b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java @@ -21,7 +21,7 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import io.druid.indexer.updater.DbUpdaterJobSpec; +import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; import io.druid.segment.indexing.IOConfig; import java.util.Map; @@ -32,12 +32,12 @@ import java.util.Map; public class HadoopIOConfig implements IOConfig { private final Map pathSpec; - private final DbUpdaterJobSpec metadataUpdateSpec; + private final MetadataStorageUpdaterJobSpec metadataUpdateSpec; private final String segmentOutputPath; public HadoopIOConfig( final @JsonProperty("inputSpec") Map pathSpec, - final @JsonProperty("metadataUpdateSpec") DbUpdaterJobSpec metadataUpdateSpec, + final @JsonProperty("metadataUpdateSpec") MetadataStorageUpdaterJobSpec metadataUpdateSpec, final @JsonProperty("segmentOutputPath") String segmentOutputPath ) { @@ -53,7 +53,7 @@ public class HadoopIOConfig implements IOConfig } @JsonProperty("metadataUpdateSpec") - public DbUpdaterJobSpec getMetadataUpdateSpec() + public MetadataStorageUpdaterJobSpec getMetadataUpdateSpec() { return metadataUpdateSpec; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index b9947e81fe9..6ac4b7a64be 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -30,7 +30,7 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; import io.druid.indexer.rollup.DataRollupSpec; -import io.druid.indexer.updater.DbUpdaterJobSpec; +import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IngestionSpec; @@ -70,7 +70,7 @@ public class HadoopIngestionSpec extends IngestionSpec> shardSpecs, final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, - final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, + final @JsonProperty("updaterJobSpec") MetadataStorageUpdaterJobSpec updaterJobSpec, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/MetadataStorageUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/MetadataStorageUpdaterJob.java index 9aafb9a82d3..0196a4ea724 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/MetadataStorageUpdaterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/MetadataStorageUpdaterJob.java @@ -29,8 +29,8 @@ import java.util.List; */ public class MetadataStorageUpdaterJob implements Jobby { - private final HadoopDruidIndexerConfig config; + @Inject private MetadataStorageUpdaterJobHandler handler; @@ -46,8 +46,7 @@ public class MetadataStorageUpdaterJob implements Jobby { final List segments = IndexGeneratorJob.getPublishedSegments(config); final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable(); - final ObjectMapper mapper = HadoopDruidIndexerConfig.jsonMapper; - handler.publishSegments(segmentTable, segments, mapper); + handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.jsonMapper); return true; } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java index 4cdbee2dedb..3ecb4c7b76a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java @@ -22,11 +22,11 @@ package io.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import io.druid.db.MetadataDbConnectorConfig; +import io.druid.db.MetadataStorageConnectorConfig; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.RandomPartitionsSpec; import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import io.druid.indexer.updater.DbUpdaterJobSpec; +import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; import io.druid.jackson.DefaultObjectMapper; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import org.joda.time.Interval; @@ -354,8 +354,8 @@ public class HadoopIngestionSpecTest HadoopIngestionSpec.class ); - final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec(); - final MetadataDbConnectorConfig connectorConfig = spec.get(); + final MetadataStorageUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec(); + final MetadataStorageConnectorConfig connectorConfig = spec.get(); Assert.assertEquals("segments", spec.getSegmentTable()); Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java index 7b0eaf82db5..3815dcd6347 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java @@ -27,7 +27,7 @@ import com.metamx.common.ISE; import com.metamx.emitter.service.ServiceEmitter; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.Task; -import io.druid.indexing.overlord.IndexerMetadataCoordinator; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskLockbox; import io.druid.timeline.DataSegment; @@ -37,13 +37,13 @@ import java.util.Set; public class TaskActionToolbox { private final TaskLockbox taskLockbox; - private final IndexerMetadataCoordinator indexerDBCoordinator; + private final IndexerMetadataStorageCoordinator indexerDBCoordinator; private final ServiceEmitter emitter; @Inject public TaskActionToolbox( TaskLockbox taskLockbox, - IndexerMetadataCoordinator indexerDBCoordinator, + IndexerMetadataStorageCoordinator indexerDBCoordinator, ServiceEmitter emitter ) { @@ -57,7 +57,7 @@ public class TaskActionToolbox return taskLockbox; } - public IndexerMetadataCoordinator getIndexerDBCoordinator() + public IndexerMetadataStorageCoordinator getIndexerDBCoordinator() { return indexerDBCoordinator; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index f54ec2e2e49..262d6dd802a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -32,8 +32,8 @@ import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import io.druid.db.MetadataDbConnector; -import io.druid.db.MetadataTablesConfig; +import io.druid.db.MetadataStorageConnector; +import io.druid.db.MetadataStorageTablesConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; @@ -47,24 +47,24 @@ import java.util.Map; public class DbTaskStorage implements TaskStorage { private final ObjectMapper jsonMapper; - private final MetadataDbConnector metadataDbConnector; - private final MetadataTablesConfig dbTables; + private final MetadataStorageConnector metadataStorageConnector; + private final MetadataStorageTablesConfig dbTables; private final TaskStorageConfig config; - private final MetadataActionHandler handler; + private final MetadataStorageActionHandler handler; private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject public DbTaskStorage( final ObjectMapper jsonMapper, - final MetadataDbConnector metadataDbConnector, - final MetadataTablesConfig dbTables, + final MetadataStorageConnector metadataStorageConnector, + final MetadataStorageTablesConfig dbTables, final TaskStorageConfig config, - final MetadataActionHandler handler + final MetadataStorageActionHandler handler ) { this.jsonMapper = jsonMapper; - this.metadataDbConnector = metadataDbConnector; + this.metadataStorageConnector = metadataStorageConnector; this.dbTables = dbTables; this.config = config; this.handler = handler; @@ -73,7 +73,7 @@ public class DbTaskStorage implements TaskStorage @LifecycleStart public void start() { - metadataDbConnector.createTaskTables(); + metadataStorageConnector.createTaskTables(); } @LifecycleStop diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index eefd67cda17..a3dfcc1a05c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -42,7 +42,7 @@ import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; -import io.druid.db.IndexerSQLMetadataCoordinator; +import io.druid.db.IndexerSQLMetadataStorageCoordinator; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; @@ -101,7 +101,7 @@ public class TaskLifecycleTest private TaskLockbox tl = null; private TaskQueue tq = null; private TaskRunner tr = null; - private MockIndexerDBCoordinator mdc = null; + private MockIndexerDBStorageCoordinator mdc = null; private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; TaskStorageQueryAdapter tsqa = null; @@ -510,12 +510,12 @@ public class TaskLifecycleTest return retVal; } - private static class MockIndexerDBCoordinator extends IndexerSQLMetadataCoordinator + private static class MockIndexerDBStorageCoordinator extends IndexerSQLMetadataStorageCoordinator { final private Set published = Sets.newHashSet(); final private Set nuked = Sets.newHashSet(); - private MockIndexerDBCoordinator() + private MockIndexerDBStorageCoordinator() { super(null, null, null); } @@ -562,9 +562,9 @@ public class TaskLifecycleTest } } - private static MockIndexerDBCoordinator newMockMDC() + private static MockIndexerDBStorageCoordinator newMockMDC() { - return new MockIndexerDBCoordinator(); + return new MockIndexerDBStorageCoordinator(); } private static ServiceEmitter newMockEmitter() diff --git a/mysql/pom.xml b/mysql/pom.xml deleted file mode 100644 index 21c7e409888..00000000000 --- a/mysql/pom.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - druid - io.druid - 0.7.0-SNAPSHOT - - 4.0.0 - - mysql - - - \ No newline at end of file diff --git a/mysql/src/main/java/io/druid/storage/mysql/MySQLConnector.java b/mysql/src/main/java/io/druid/storage/mysql/MySQLConnector.java deleted file mode 100644 index bc1fd7a9ef9..00000000000 --- a/mysql/src/main/java/io/druid/storage/mysql/MySQLConnector.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.storage.mysql; - -import com.google.common.base.Supplier; -import com.google.inject.Inject; -import com.metamx.common.logger.Logger; -import io.druid.db.MetadataStorageConnectorConfig; -import io.druid.db.MetadataStorageTablesConfig; -import io.druid.db.SQLMetadataConnector; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.IDBI; -import org.skife.jdbi.v2.tweak.HandleCallback; - -import java.util.List; -import java.util.Map; - -public class MySQLConnector extends SQLMetadataConnector -{ - private static final Logger log = new Logger(MySQLConnector.class); - private final DBI dbi; - - @Inject - public MySQLConnector(Supplier config, Supplier dbTables) - { - super(config, dbTables); - this.dbi = new DBI(getDatasource()); - } - - public void createTable(final IDBI dbi, final String tableName, final String sql) - { - try { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } - return null; - } - } - ); - } - catch (Exception e) { - log.warn(e, "Exception creating table"); - } - } - - public void createSegmentTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, " - + "start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, " - + "used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", - tableName - ) - ); - } - - public void createRulesTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", - tableName - ) - ); - } - - public void createConfigTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", - tableName - ) - ); - } - - public void createTaskTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", - tableName - ) - ); - } - - public void createTaskLogTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", - tableName - ) - ); - } - - public void createTaskLockTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", - tableName - ) - ); - } - - public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn) - { - return String.format( - "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", - tableName, keyColumn, valueColumn - ); - } - - public DBI getDBI() { return dbi; } - -} diff --git a/mysql/src/main/java/io/druid/storage/mysql/MySQLMetadataStorageModule.java b/mysql/src/main/java/io/druid/storage/mysql/MySQLMetadataStorageModule.java deleted file mode 100644 index 23cb843504a..00000000000 --- a/mysql/src/main/java/io/druid/storage/mysql/MySQLMetadataStorageModule.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.storage.mysql; - -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Key; -import com.google.inject.Provides; -import io.druid.db.IndexerSQLMetadataStorageCoordinator; -import io.druid.db.MetadataStorageConnectorConfig; -import io.druid.db.MetadataRuleManager; -import io.druid.db.MetadataRuleManagerProvider; -import io.druid.db.MetadataSegmentManager; -import io.druid.db.MetadataSegmentManagerProvider; -import io.druid.db.MetadataStorageConnector; -import io.druid.db.MetadataStorageTablesConfig; -import io.druid.db.MetadataSegmentPublisherProvider; -import io.druid.db.SQLMetadataConnector; -import io.druid.db.SQLMetadataStorageActionHandler; -import io.druid.db.SQLMetadataRuleManager; -import io.druid.db.SQLMetadataRuleManagerProvider; -import io.druid.db.SQLMetadataSegmentManager; -import io.druid.db.SQLMetadataSegmentManagerProvider; -import io.druid.db.SQLMetadataSegmentPublisher; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.LazySingleton; -import io.druid.guice.PolyBind; -import io.druid.indexer.MetadataStorageUpdaterJobHandler; -import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import io.druid.indexing.overlord.MetadataStorageActionHandler; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.DbSegmentPublisher; -import io.druid.db.SQLMetadataSegmentPublisherProvider; -import io.druid.storage.jdbc.mysql.MySQLConnector; -import io.druid.storage.jdbc.postgresql.PostgreSQLConnector; -import org.skife.jdbi.v2.IDBI; - -import java.util.List; - -// TODO: change it to JDBC -public class MySQLMetadataStorageModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of(); - } - - @Override - public void configure(Binder binder) - { - bindMySQL(binder); - bindPostgres(binder); - JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class); - JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class); - } - - private void bindMySQL(Binder binder) { - PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class)) - .addBinding("mysql") - .to(MySQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class)) - .addBinding("mysql") - .to(MySQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class)) - .addBinding("mysql") - .to(SQLMetadataRuleManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class)) - .addBinding("mysql") - .to(SQLMetadataRuleManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentPublisher.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentPublisherProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class)) - .addBinding("mysql") - .to(SQLMetadataStorageActionHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) - .addBinding("mysql") - .to(IndexerSQLMetadataStorageCoordinator.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) - .addBinding("mysql") - .to(SQLMetadataStorageUpdaterJobHandler.class) - .in(LazySingleton.class); - } - - private void bindPostgres(Binder binder) { - PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class)) - .addBinding("postgresql") - .to(PostgreSQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class)) - .addBinding("postgresql") - .to(PostgreSQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class)) - .addBinding("postgresql") - .to(SQLMetadataRuleManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataRuleManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentPublisher.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class)) - .addBinding("postgresql") - .to(SQLMetadataStorageActionHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentPublisherProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) - .addBinding("postgresql") - .to(SQLMetadataStorageUpdaterJobHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) - .addBinding("postgresql") - .to(IndexerSQLMetadataStorageCoordinator.class) - .in(LazySingleton.class); - } - - @Provides - @LazySingleton - public IDBI getDbi(final SQLMetadataConnector dbConnector) - { - return dbConnector.getDBI(); - } -} \ No newline at end of file diff --git a/mysql/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/mysql/src/main/resources/META-INF/services/io.druid.initialization.DruidModule deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/pom.xml b/pom.xml index 2f5c984031c..061b686efc9 100644 --- a/pom.xml +++ b/pom.xml @@ -56,11 +56,12 @@ cassandra-storage hdfs-storage s3-extensions - jdbc-storage kafka-seven kafka-eight rabbitmq histogram + mysql-storage + postgres-storage diff --git a/postgres/pom.xml b/postgres/pom.xml deleted file mode 100644 index 2e21c485291..00000000000 --- a/postgres/pom.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - druid - io.druid - 0.7.0-SNAPSHOT - - 4.0.0 - - postgres - - - \ No newline at end of file diff --git a/postgres/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java b/postgres/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java deleted file mode 100644 index fd36ec172c0..00000000000 --- a/postgres/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.storage.postgres; - -import com.google.common.base.Supplier; -import com.google.inject.Inject; -import com.metamx.common.logger.Logger; -import io.druid.db.MetadataStorageConnectorConfig; -import io.druid.db.MetadataStorageTablesConfig; -import io.druid.db.SQLMetadataConnector; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.IDBI; -import org.skife.jdbi.v2.tweak.HandleCallback; - -import java.util.List; -import java.util.Map; - -public class PostgreSQLConnector extends SQLMetadataConnector -{ - private static final Logger log = new Logger(PostgreSQLConnector.class); - private final DBI dbi; - - @Inject - public PostgreSQLConnector(Supplier config, Supplier dbTables) - { - super(config, dbTables); - this.dbi = new DBI(getDatasource()); - - } - - public void createTable(final IDBI dbi, final String tableName, final String sql) - { - try { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - List> table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName)); - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } - return null; - } - } - ); - } - catch (Exception e) { - log.warn(e, "Exception creating table"); - } - } - - public void createSegmentTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, " - + "start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, " - + "used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + - "CREATE INDEX ON %1$s(dataSource);"+ - "CREATE INDEX ON %1$s(used);", - tableName - ) - ); - } - - public void createRulesTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ - "CREATE INDEX ON %1$s(dataSource);", - tableName - ) - ); - } - - public void createConfigTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))", - tableName - ) - ); - } - - public void createTaskTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id varchar(255) NOT NULL,\n" - + " created_date TEXT NOT NULL,\n" - + " datasource varchar(255) NOT NULL,\n" - + " payload bytea NOT NULL,\n" - + " status_payload bytea NOT NULL,\n" - + " active SMALLINT NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (id)\n" - + ");\n" + - "CREATE INDEX ON %1$s(active, created_date);", - tableName - ) - ); - } - - public void createTaskLogTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id bigserial NOT NULL,\n" - + " task_id varchar(255) DEFAULT NULL,\n" - + " log_payload bytea,\n" - + " PRIMARY KEY (id)\n" - + ");\n"+ - "CREATE INDEX ON %1$s(task_id);", - tableName - ) - ); - } - - public void createTaskLockTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id bigserial NOT NULL,\n" - + " task_id varchar(255) DEFAULT NULL,\n" - + " lock_payload bytea,\n" - + " PRIMARY KEY (id)\n" - + ");\n"+ - "CREATE INDEX ON %1$s(task_id);", - tableName - ) - ); - } - - public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn) - { - return String.format( - "BEGIN;\n" + - "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + - "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + - " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + - "COMMIT;", - tableName, keyColumn, valueColumn - ); - } - - public DBI getDBI() { return dbi; } -} diff --git a/postgres/src/main/java/io/druid/storage/postgres/PostgresMetadataStorageModule.java b/postgres/src/main/java/io/druid/storage/postgres/PostgresMetadataStorageModule.java deleted file mode 100644 index 2cd3154bfea..00000000000 --- a/postgres/src/main/java/io/druid/storage/postgres/PostgresMetadataStorageModule.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.storage.postgres; - -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Key; -import com.google.inject.Provides; -import io.druid.db.IndexerSQLMetadataStorageCoordinator; -import io.druid.db.MetadataStorageConnectorConfig; -import io.druid.db.MetadataRuleManager; -import io.druid.db.MetadataRuleManagerProvider; -import io.druid.db.MetadataSegmentManager; -import io.druid.db.MetadataSegmentManagerProvider; -import io.druid.db.MetadataStorageConnector; -import io.druid.db.MetadataStorageTablesConfig; -import io.druid.db.MetadataSegmentPublisherProvider; -import io.druid.db.SQLMetadataConnector; -import io.druid.db.SQLMetadataStorageActionHandler; -import io.druid.db.SQLMetadataRuleManager; -import io.druid.db.SQLMetadataRuleManagerProvider; -import io.druid.db.SQLMetadataSegmentManager; -import io.druid.db.SQLMetadataSegmentManagerProvider; -import io.druid.db.SQLMetadataSegmentPublisher; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.LazySingleton; -import io.druid.guice.PolyBind; -import io.druid.indexer.MetadataStorageUpdaterJobHandler; -import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import io.druid.indexing.overlord.MetadataStorageActionHandler; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.DbSegmentPublisher; -import io.druid.db.SQLMetadataSegmentPublisherProvider; -import io.druid.storage.jdbc.mysql.MySQLConnector; -import io.druid.storage.jdbc.postgresql.PostgreSQLConnector; -import org.skife.jdbi.v2.IDBI; - -import java.util.List; - -// TODO: change it to JDBC -public class PostgresMetadataStorageModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of(); - } - - @Override - public void configure(Binder binder) - { - bindMySQL(binder); - bindPostgres(binder); - JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class); - JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class); - } - - private void bindMySQL(Binder binder) { - PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class)) - .addBinding("mysql") - .to(MySQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class)) - .addBinding("mysql") - .to(SQLMetadataRuleManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class)) - .addBinding("mysql") - .to(SQLMetadataRuleManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentPublisher.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class)) - .addBinding("mysql") - .to(SQLMetadataSegmentPublisherProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class)) - .addBinding("mysql") - .to(SQLMetadataStorageActionHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) - .addBinding("mysql") - .to(IndexerSQLMetadataStorageCoordinator.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) - .addBinding("mysql") - .to(SQLMetadataStorageUpdaterJobHandler.class) - .in(LazySingleton.class); - } - - private void bindPostgres(Binder binder) { - PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class)) - .addBinding("postgresql") - .to(PostgreSQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class)) - .addBinding("postgresql") - .to(PostgreSQLConnector.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class)) - .addBinding("postgresql") - .to(SQLMetadataRuleManager.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataRuleManagerProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentPublisher.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class)) - .addBinding("postgresql") - .to(SQLMetadataStorageActionHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class)) - .addBinding("postgresql") - .to(SQLMetadataSegmentPublisherProvider.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) - .addBinding("postgresql") - .to(SQLMetadataStorageUpdaterJobHandler.class) - .in(LazySingleton.class); - - PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) - .addBinding("postgresql") - .to(IndexerSQLMetadataStorageCoordinator.class) - .in(LazySingleton.class); - } - - @Provides - @LazySingleton - public IDBI getDbi(final SQLMetadataConnector dbConnector) - { - return dbConnector.getDBI(); - } -} \ No newline at end of file diff --git a/postgres/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/postgres/src/main/resources/META-INF/services/io.druid.initialization.DruidModule deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/server/src/main/java/io/druid/db/DerbyConnector.java b/server/src/main/java/io/druid/db/DerbyConnector.java index 2aac84290e8..72c66440912 100644 --- a/server/src/main/java/io/druid/db/DerbyConnector.java +++ b/server/src/main/java/io/druid/db/DerbyConnector.java @@ -41,7 +41,7 @@ public class DerbyConnector extends SQLMetadataConnector private final DBI dbi; @Inject - public DerbyConnector(Supplier config, Supplier dbTables) + public DerbyConnector(Supplier config, Supplier dbTables) { super(config, dbTables); this.dbi = new DBI(getConnectionFactory("druidDerbyDb")); @@ -187,7 +187,7 @@ public class DerbyConnector extends SQLMetadataConnector @Override public Void insertOrUpdate( - final String storageName, + final String tableName, final String keyColumn, final String valueColumn, final String key, @@ -204,15 +204,17 @@ public class DerbyConnector extends SQLMetadataConnector handle.begin(); conn.setAutoCommit(false); List> entry = handle.createQuery( - String.format("SELECT * FROM %1$s WHERE %2$s=:key", storageName, keyColumn) + String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn) ).list(); if (entry == null || entry.isEmpty()) { - handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)", storageName, keyColumn, valueColumn)) + handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)", + tableName, keyColumn, valueColumn)) .bind("key", key) .bind("value", value) .execute(); } else { - handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key", storageName, keyColumn, valueColumn)) + handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key", + tableName, keyColumn, valueColumn)) .bind("key", key) .bind("value", value) .execute(); diff --git a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java b/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java index 891f3a654a3..055b0f5dbb1 100644 --- a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java @@ -38,7 +38,7 @@ public class DerbyMetadataRuleManager extends SQLMetadataRuleManager public DerbyMetadataRuleManager( @Json ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, + Supplier dbTables, IDBI dbi ) { super(jsonMapper, config, dbTables, dbi); diff --git a/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java b/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java index dd63bb4babd..88015b014e8 100644 --- a/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java +++ b/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java @@ -32,8 +32,8 @@ public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProv { private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; - private final MetadataDbConnector dbConnector; + private final Supplier dbTables; + private final MetadataStorageConnector dbConnector; private final Lifecycle lifecycle; private final IDBI dbi; @@ -41,8 +41,8 @@ public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProv public DerbyMetadataRuleManagerProvider( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, - MetadataDbConnector dbConnector, + Supplier dbTables, + MetadataStorageConnector dbConnector, IDBI dbi, Lifecycle lifecycle ) diff --git a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java b/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java index 5ff96bf40d3..83fe971970d 100644 --- a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java @@ -19,13 +19,10 @@ package io.druid.db; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.common.MapUtils; @@ -36,7 +33,6 @@ import com.metamx.common.logger.Logger; import io.druid.client.DruidDataSource; import io.druid.concurrent.Execs; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordinator.rules.Rule; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -52,7 +48,6 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; @@ -74,7 +69,7 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; + private final Supplier dbTables; private final AtomicReference> dataSources; private final IDBI dbi; @@ -86,7 +81,7 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager public DerbyMetadataSegmentManager( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, + Supplier dbTables, IDBI dbi ) { diff --git a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManagerProvider.java b/server/src/main/java/io/druid/db/DerbyMetadataSegmentManagerProvider.java index 44e9fabdf38..2cfa35051c5 100644 --- a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManagerProvider.java +++ b/server/src/main/java/io/druid/db/DerbyMetadataSegmentManagerProvider.java @@ -24,15 +24,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.common.lifecycle.Lifecycle; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.IDBI; public class DerbyMetadataSegmentManagerProvider implements MetadataSegmentManagerProvider { private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; - private final MetadataDbConnector dbConnector; + private final Supplier dbTables; + private final MetadataStorageConnector dbConnector; private final IDBI dbi; private final Lifecycle lifecycle; @@ -40,8 +39,8 @@ public class DerbyMetadataSegmentManagerProvider implements MetadataSegmentManag public DerbyMetadataSegmentManagerProvider( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, - MetadataDbConnector dbConnector, + Supplier dbTables, + MetadataStorageConnector dbConnector, IDBI dbi, Lifecycle lifecycle ) diff --git a/server/src/main/java/io/druid/db/SQLMetadataConnector.java b/server/src/main/java/io/druid/db/SQLMetadataConnector.java index 903da1d9449..5189b498ebb 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/db/SQLMetadataConnector.java @@ -34,12 +34,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; -public abstract class SQLMetadataConnector implements MetadataDbConnector +public abstract class SQLMetadataConnector implements MetadataStorageConnector { - private final Supplier config; - private final Supplier dbTables; + private final Supplier config; + private final Supplier dbTables; - protected SQLMetadataConnector(Supplier config, Supplier dbTables) + protected SQLMetadataConnector(Supplier config, Supplier dbTables) { this.config = config; this.dbTables = dbTables; @@ -88,23 +88,23 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector @Override public void createTaskTables() { if (config.get().isCreateTables()) { - final MetadataTablesConfig metadataTablesConfig = dbTables.get(); - createTaskTable(getDBI(), metadataTablesConfig.getTasksTable()); - createTaskLogTable(getDBI(), metadataTablesConfig.getTaskLogTable()); - createTaskLockTable(getDBI(), metadataTablesConfig.getTaskLockTable()); + final MetadataStorageTablesConfig metadataStorageTablesConfig = dbTables.get(); + createTaskTable(getDBI(), metadataStorageTablesConfig.getTasksTable()); + createTaskLogTable(getDBI(), metadataStorageTablesConfig.getTaskLogTable()); + createTaskLockTable(getDBI(), metadataStorageTablesConfig.getTaskLockTable()); } } @Override public Void insertOrUpdate( - final String storageName, + final String tableName, final String keyColumn, final String valueColumn, final String key, final byte[] value ) throws Exception { - final String insertOrUpdateStatement = insertOrUpdateStatement(storageName, keyColumn, valueColumn); + final String insertOrUpdateStatement = insertOrUpdateStatement(tableName, keyColumn, valueColumn); return getDBI().withHandle( new HandleCallback() @@ -124,14 +124,14 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector @Override public byte[] lookup( - final String storageName, + final String tableName, final String keyColumn, final String valueColumn, final String key ) { final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn, - storageName, keyColumn); + tableName, keyColumn); return getDBI().withHandle( new HandleCallback() @@ -167,11 +167,11 @@ public abstract class SQLMetadataConnector implements MetadataDbConnector ); } - public MetadataDbConnectorConfig getConfig() { return config.get(); } + public MetadataStorageConnectorConfig getConfig() { return config.get(); } protected DataSource getDatasource() { - MetadataDbConnectorConfig connectorConfig = config.get(); + MetadataStorageConnectorConfig connectorConfig = config.get(); BasicDataSource dataSource = new BasicDataSource(); dataSource.setUsername(connectorConfig.getUser()); diff --git a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java index 4a7404a2e3f..4b251759e05 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java @@ -33,7 +33,6 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; -import com.mysql.jdbc.Clob; import io.druid.client.DruidServer; import io.druid.concurrent.Execs; import io.druid.guice.ManageLifecycle; @@ -124,7 +123,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; + private final Supplier dbTables; private final IDBI dbi; private final AtomicReference>> rules; @@ -138,7 +137,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager public SQLMetadataRuleManager( @Json ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, + Supplier dbTables, IDBI dbi ) { diff --git a/server/src/main/java/io/druid/db/SQLMetadataRuleManagerProvider.java b/server/src/main/java/io/druid/db/SQLMetadataRuleManagerProvider.java index a83b20a9c58..411fa93dc03 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataRuleManagerProvider.java +++ b/server/src/main/java/io/druid/db/SQLMetadataRuleManagerProvider.java @@ -32,8 +32,8 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid { private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; - private final MetadataDbConnector dbConnector; + private final Supplier dbTables; + private final MetadataStorageConnector dbConnector; private final Lifecycle lifecycle; private final IDBI dbi; @@ -41,8 +41,8 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid public SQLMetadataRuleManagerProvider( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, - MetadataDbConnector dbConnector, + Supplier dbTables, + MetadataStorageConnector dbConnector, IDBI dbi, Lifecycle lifecycle ) diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java index 2406b306074..c9f7832d7b0 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java @@ -19,13 +19,10 @@ package io.druid.db; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.common.MapUtils; @@ -36,7 +33,6 @@ import com.metamx.common.logger.Logger; import io.druid.client.DruidDataSource; import io.druid.concurrent.Execs; import io.druid.guice.ManageLifecycle; -import io.druid.server.coordinator.rules.Rule; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -52,11 +48,9 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -74,7 +68,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; + private final Supplier dbTables; private final AtomicReference> dataSources; private final IDBI dbi; @@ -86,7 +80,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager public SQLMetadataSegmentManager( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, + Supplier dbTables, IDBI dbi ) { diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentManagerProvider.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentManagerProvider.java index 19b795e899d..add1aface92 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentManagerProvider.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentManagerProvider.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.common.lifecycle.Lifecycle; -import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.IDBI; @@ -32,8 +31,8 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager { private final ObjectMapper jsonMapper; private final Supplier config; - private final Supplier dbTables; - private final MetadataDbConnector dbConnector; + private final Supplier dbTables; + private final MetadataStorageConnector dbConnector; private final IDBI dbi; private final Lifecycle lifecycle; @@ -41,8 +40,8 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager public SQLMetadataSegmentManagerProvider( ObjectMapper jsonMapper, Supplier config, - Supplier dbTables, - MetadataDbConnector dbConnector, + Supplier dbTables, + MetadataStorageConnector dbConnector, IDBI dbi, Lifecycle lifecycle ) diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java index 7b2a03cadb3..22662229333 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java @@ -41,14 +41,14 @@ public class SQLMetadataSegmentPublisher implements DbSegmentPublisher private static final Logger log = new Logger(SQLMetadataSegmentPublisher.class); private final ObjectMapper jsonMapper; - private final MetadataTablesConfig config; + private final MetadataStorageTablesConfig config; private final IDBI dbi; private final String statement; @Inject public SQLMetadataSegmentPublisher( ObjectMapper jsonMapper, - MetadataTablesConfig config, + MetadataStorageTablesConfig config, IDBI dbi ) { diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisherProvider.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisherProvider.java index 52eaec19de6..d7d3b41d474 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisherProvider.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisherProvider.java @@ -36,7 +36,7 @@ public class SQLMetadataSegmentPublisherProvider implements MetadataSegmentPubli @JacksonInject @NotNull - private MetadataTablesConfig config = null; + private MetadataStorageTablesConfig config = null; @JacksonInject @NotNull diff --git a/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java index 55391d215c5..492c9153ffb 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/db/SQLMetadataStorageActionHandler.java @@ -53,7 +53,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan /* Insert stuff. @returns number of entries inserted on success */ public void insert( final String tableName, - final String Id, + final String id, final String createdDate, final String dataSource, final byte[] payload, @@ -73,7 +73,7 @@ public class SQLMetadataStorageActionHandler implements MetadataStorageActionHan tableName ) ) - .bind("id", Id) + .bind("id", id) .bind("created_date", createdDate) .bind("datasource", dataSource) .bind("payload", payload) diff --git a/server/src/main/java/io/druid/guice/MetadataDbConfigModule.java b/server/src/main/java/io/druid/guice/MetadataDbConfigModule.java index 23081c35eae..cf3a18e1ac2 100644 --- a/server/src/main/java/io/druid/guice/MetadataDbConfigModule.java +++ b/server/src/main/java/io/druid/guice/MetadataDbConfigModule.java @@ -21,18 +21,18 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; -import io.druid.db.MetadataDbConnectorConfig; +import io.druid.db.MetadataStorageConnectorConfig; import io.druid.db.MetadataRuleManagerConfig; import io.druid.db.MetadataSegmentManagerConfig; -import io.druid.db.MetadataTablesConfig; +import io.druid.db.MetadataStorageTablesConfig; public class MetadataDbConfigModule implements Module { @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.db.tables", MetadataTablesConfig.class); - JsonConfigProvider.bind(binder, "druid.db.connector", MetadataDbConnectorConfig.class); + JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class); + JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class); JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class); diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 5db88a0a18b..32c6e89e93e 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule; import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; -import io.druid.guice.DerbyStorageDruidModule; +import io.druid.guice.DerbyMetadataStorageDruidModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; import io.druid.guice.ExtensionsConfig; @@ -334,7 +334,7 @@ public class Initialization new DiscoveryModule(), new ServerViewModule(), new MetadataDbConfigModule(), - new DerbyStorageDruidModule(), + new DerbyMetadataStorageDruidModule(), new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), new LocalDataStorageDruidModule(), diff --git a/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java b/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java index bc801611ec0..42c50903353 100644 --- a/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java @@ -1,8 +1,116 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.db; +import com.google.common.base.Suppliers; +import com.google.common.collect.Maps; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + /** - * Created by jisookim on 9/26/14. */ public class MetadataSegmentManagerTest { + private DerbyMetadataSegmentManager manager; + private IDBI dbi; + private List> testRows; + + @Before + public void setUp() throws Exception + { + dbi = EasyMock.createMock(IDBI.class); + manager = new DerbyMetadataSegmentManager( + new DefaultObjectMapper(), + Suppliers.ofInstance(new MetadataSegmentManagerConfig()), + Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase("test")), + dbi + ); + + Map map1 = Maps.newHashMap(); + map1.put("id", "wikipedia_editstream_2012-03-15T00:00:00.000Z_2012-03-16T00:00:00.000Z_2012-03-16T00:36:30.848Z"); + map1.put("dataSource", "wikipedia_editstream"); + map1.put("created_date", "2012-03-23T22:27:21.957Z"); + map1.put("start", "2012-03-15T00:00:00.000Z"); + map1.put("end", "2012-03-16T00:00:00.000Z"); + map1.put("partitioned", 0); + map1.put("version", "2012-03-16T00:36:30.848Z"); + map1.put("used", 1); + map1.put( + "payload", "{\"dataSource\":\"wikipedia_editstream\",\"interval\":" + + "\"2012-03-15T00:00:00.000/2012-03-16T00:00:00.000\",\"version\":\"2012-03-16T00:36:30.848Z\"" + + ",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"metamx-kafka-data\",\"key\":" + + "\"wikipedia-editstream/v3/beta-index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index" + + ".zip\"},\"dimensions\":\"page,namespace,language,user,anonymous,robot,newPage,unpatrolled," + + "geo,continent_code,country_name,city,region_lookup,dma_code,area_code,network,postal_code\"" + + ",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"}," + + "\"size\":26355195,\"identifier\":\"wikipedia_editstream_2012-03-15T00:00:00.000Z_2012-03-16" + + "T00:00:00.000Z_2012-03-16T00:36:30.848Z\"}" + ); + + Map map2 = Maps.newHashMap(); + map2.put("id", "twitterstream_2012-01-05T00:00:00.000Z_2012-01-06T00:00:00.000Z_2012-01-06T22:19:12.565Z"); + map2.put("dataSource", "twitterstream"); + map2.put("created_date", "2012-03-23T22:27:21.988Z"); + map2.put("start", "2012-01-05T00:00:00.000Z"); + map2.put("end", "2012-01-06T00:00:00.000Z"); + map2.put("partitioned", 0); + map2.put("version", "2012-01-06T22:19:12.565Z"); + map2.put("used", 1); + map2.put( + "payload", "{\"dataSource\":\"twitterstream\",\"interval\":\"2012-01-05T00:00:00.000/2012-01-06T00:00:00.000\"," + + "\"version\":\"2012-01-06T22:19:12.565Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":" + + "\"metamx-twitterstream\",\"key\":\"index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip\"}" + + ",\"dimensions\":\"user_name,user_lang,user_time_zone,user_location," + + "user_mention_name,has_mention,reply_to_name,first_hashtag,rt_name,url_domain,has_links," + + "has_geo,is_retweet,is_viral\",\"metrics\":\"count,tweet_length,num_followers,num_links," + + "num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"}," + + "\"size\":511804455,\"identifier\":" + + "\"twitterstream_2012-01-05T00:00:00.000Z_2012-01-06T00:00:00.000Z_2012-01-06T22:19:12.565Z\"}" + ); + + testRows = Arrays.>asList(map1, map2); + } + + @After + public void tearDown() throws Exception + { + EasyMock.verify(dbi); + } + + @Test + public void testPoll() + { + EasyMock.expect(dbi.withHandle(EasyMock.anyObject())).andReturn(testRows).times(2); + EasyMock.replay(dbi); + + manager.start(); + manager.poll(); + manager.stop(); + } } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 8326c7872c0..59ee930fb77 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -31,7 +31,7 @@ import com.google.inject.servlet.GuiceFilter; import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; -import io.druid.db.IndexerSQLMetadataCoordinator; +import io.druid.db.IndexerSQLMetadataStorageCoordinator; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceTaskLogsModule; @@ -53,7 +53,7 @@ import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import io.druid.indexing.overlord.DbTaskStorage; import io.druid.indexing.overlord.ForkingTaskRunnerFactory; import io.druid.indexing.overlord.HeapMemoryTaskStorage; -import io.druid.indexing.overlord.IndexerMetadataCoordinator; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.RemoteTaskRunnerFactory; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskMaster; @@ -139,7 +139,7 @@ public class CliOverlord extends ServerRunnable binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class); - binder.bind(IndexerMetadataCoordinator.class).to(IndexerSQLMetadataCoordinator.class).in(LazySingleton.class); + binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class); binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); binder.bind(ResourceManagementSchedulerFactory.class) diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index d3ab47dccd4..a3a136dd78c 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -31,7 +31,7 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; -import io.druid.db.IndexerSQLMetadataCoordinator; +import io.druid.db.IndexerSQLMetadataStorageCoordinator; import io.druid.guice.Binders; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.Jerseys; @@ -50,7 +50,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.overlord.HeapMemoryTaskStorage; -import io.druid.indexing.overlord.IndexerMetadataCoordinator; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -183,7 +183,7 @@ public class CliPeon extends GuiceRunnable // all of these bindings are so that we can run the peon in local mode binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class); - binder.bind(IndexerMetadataCoordinator.class).to(IndexerSQLMetadataCoordinator.class).in(LazySingleton.class); + binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class); taskActionBinder.addBinding("remote") .to(RemoteTaskActionClientFactory.class).in(LazySingleton.class); diff --git a/services/src/main/java/io/druid/cli/CreateTables.java b/services/src/main/java/io/druid/cli/CreateTables.java index 209d98a6a7f..dc689d4fed8 100644 --- a/services/src/main/java/io/druid/cli/CreateTables.java +++ b/services/src/main/java/io/druid/cli/CreateTables.java @@ -27,9 +27,9 @@ import com.google.inject.Module; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.airlift.command.Option; -import io.druid.db.MetadataDbConnector; -import io.druid.db.MetadataDbConnectorConfig; -import io.druid.db.MetadataTablesConfig; +import io.druid.db.MetadataStorageConnector; +import io.druid.db.MetadataStorageConnectorConfig; +import io.druid.db.MetadataStorageTablesConfig; import io.druid.guice.JsonConfigProvider; import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; @@ -71,7 +71,7 @@ public class CreateTables extends GuiceRunnable public void configure(Binder binder) { JsonConfigProvider.bindInstance( - binder, Key.get(MetadataDbConnectorConfig.class), new MetadataDbConnectorConfig() + binder, Key.get(MetadataStorageConnectorConfig.class), new MetadataStorageConnectorConfig() { @Override public String getConnectURI() @@ -93,7 +93,7 @@ public class CreateTables extends GuiceRunnable } ); JsonConfigProvider.bindInstance( - binder, Key.get(MetadataTablesConfig.class), MetadataTablesConfig.fromBase(base) + binder, Key.get(MetadataStorageTablesConfig.class), MetadataStorageTablesConfig.fromBase(base) ); JsonConfigProvider.bindInstance( binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1) @@ -107,7 +107,7 @@ public class CreateTables extends GuiceRunnable public void run() { final Injector injector = makeInjector(); - MetadataDbConnector dbConnector = injector.getInstance(MetadataDbConnector.class); + MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class); dbConnector.createSegmentTable(); dbConnector.createRulesTable(); dbConnector.createConfigTable();