diff --git a/common/pom.xml b/common/pom.xml
index 03e902df7c5..d54a3486cbf 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -46,9 +46,8 @@
commons-codec
- commons-dbcp
- commons-dbcp
- 1.4
+ org.apache.commons
+ commons-dbcp2
commons-pool
diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java
index dac69a1543f..094d579aad0 100644
--- a/common/src/main/java/io/druid/common/config/ConfigManager.java
+++ b/common/src/main/java/io/druid/common/config/ConfigManager.java
@@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
-import io.druid.db.DbConnector;
-import io.druid.db.DbTablesConfig;
+import io.druid.metadata.MetadataStorageConnector;
+import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;
@@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
- private final DbConnector dbConnector;
+ private final MetadataStorageConnector dbConnector;
private final Supplier config;
private final ScheduledExecutorService exec;
@@ -58,7 +58,7 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
- public ConfigManager(DbConnector dbConnector, Supplier dbTables, Supplier config)
+ public ConfigManager(MetadataStorageConnector dbConnector, Supplier dbTables, Supplier config)
{
this.dbConnector = dbConnector;
this.config = config;
diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java
deleted file mode 100644
index 1e783399270..00000000000
--- a/common/src/main/java/io/druid/db/DbConnector.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.db;
-
-import com.google.common.base.Supplier;
-import com.google.inject.Inject;
-import com.metamx.common.ISE;
-import com.metamx.common.logger.Logger;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.StatementContext;
-import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
-
-import javax.sql.DataSource;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class DbConnector
-{
- private static final Logger log = new Logger(DbConnector.class);
-
- public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- segmentTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
- "CREATE INDEX ON %1$s(dataSource);"+
- "CREATE INDEX ON %1$s(used);":
- "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
- segmentTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- ruleTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
- "CREATE INDEX ON %1$s(dataSource);":
- "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
- ruleTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- configTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
- "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
- configTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id varchar(255) NOT NULL,\n"
- + " created_date TEXT NOT NULL,\n"
- + " datasource varchar(255) NOT NULL,\n"
- + " payload bytea NOT NULL,\n"
- + " status_payload bytea NOT NULL,\n"
- + " active SMALLINT NOT NULL DEFAULT '0',\n"
- + " PRIMARY KEY (id)\n"
- + ");\n" +
- "CREATE INDEX ON %1$s(active, created_date);":
- "CREATE TABLE `%s` (\n"
- + " `id` varchar(255) NOT NULL,\n"
- + " `created_date` tinytext NOT NULL,\n"
- + " `datasource` varchar(255) NOT NULL,\n"
- + " `payload` longblob NOT NULL,\n"
- + " `status_payload` longblob NOT NULL,\n"
- + " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY (active, created_date(100))\n"
- + ")",
- taskTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskLogsTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id bigserial NOT NULL,\n"
- + " task_id varchar(255) DEFAULT NULL,\n"
- + " log_payload bytea,\n"
- + " PRIMARY KEY (id)\n"
- + ");\n"+
- "CREATE INDEX ON %1$s(task_id);":
- "CREATE TABLE `%s` (\n"
- + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
- + " `task_id` varchar(255) DEFAULT NULL,\n"
- + " `log_payload` longblob,\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY `task_id` (`task_id`)\n"
- + ")",
- taskLogsTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskLocksTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id bigserial NOT NULL,\n"
- + " task_id varchar(255) DEFAULT NULL,\n"
- + " lock_payload bytea,\n"
- + " PRIMARY KEY (id)\n"
- + ");\n"+
- "CREATE INDEX ON %1$s(task_id);":
- "CREATE TABLE `%s` (\n"
- + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
- + " `task_id` varchar(255) DEFAULT NULL,\n"
- + " `lock_payload` longblob,\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY `task_id` (`task_id`)\n"
- + ")",
- taskLocksTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTable(
- final IDBI dbi,
- final String tableName,
- final String sql,
- final boolean isPostgreSQL
- )
- {
- try {
- dbi.withHandle(
- new HandleCallback()
- {
- @Override
- public Void withHandle(Handle handle) throws Exception
- {
- List
-
- mysql
- mysql-connector-java
- 5.1.18
-
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
index 5280e394f6f..d76461ab171 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
@@ -81,7 +81,7 @@ public class SegmentInsertAction implements TaskAction>
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- final Set retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
+ final Set retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
index fb296871d2d..3aaca159ccd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
@@ -68,7 +68,7 @@ public class SegmentListUnusedAction implements TaskAction>
@Override
public List perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
+ return toolbox.getIndexerMetadataStorageCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
index ff9cfc492ca..82d16daf3ae 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
@@ -68,7 +68,7 @@ public class SegmentListUsedAction implements TaskAction>
@Override
public List perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
+ return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
index 4356c80dc59..2abd53fea08 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
@@ -42,7 +42,7 @@ public class SegmentMetadataUpdateAction implements TaskAction
) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
+ toolbox.getIndexerMetadataStorageCoordinator().updateSegmentMetadata(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
index 54258df1c2d..62bc6142c6d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
@@ -59,7 +59,7 @@ public class SegmentNukeAction implements TaskAction
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- toolbox.getIndexerDBCoordinator().deleteSegments(segments);
+ toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
index d9b0520f40b..5b055b4ba90 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
@@ -27,7 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.overlord.IndexerDBCoordinator;
+import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;
@@ -37,18 +37,18 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
- private final IndexerDBCoordinator indexerDBCoordinator;
+ private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
- IndexerDBCoordinator indexerDBCoordinator,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter
)
{
this.taskLockbox = taskLockbox;
- this.indexerDBCoordinator = indexerDBCoordinator;
+ this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
}
@@ -57,9 +57,9 @@ public class TaskActionToolbox
return taskLockbox;
}
- public IndexerDBCoordinator getIndexerDBCoordinator()
+ public IndexerMetadataStorageCoordinator getIndexerMetadataStorageCoordinator()
{
- return indexerDBCoordinator;
+ return indexerMetadataStorageCoordinator;
}
public ServiceEmitter getEmitter()
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
deleted file mode 100644
index 05576184655..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.overlord;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import com.metamx.common.RetryUtils;
-import com.metamx.common.lifecycle.LifecycleStart;
-import com.metamx.common.lifecycle.LifecycleStop;
-import com.metamx.emitter.EmittingLogger;
-import com.mysql.jdbc.exceptions.MySQLTransientException;
-import io.druid.db.DbConnector;
-import io.druid.db.DbTablesConfig;
-import io.druid.indexing.common.TaskLock;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.actions.TaskAction;
-import io.druid.indexing.common.config.TaskStorageConfig;
-import io.druid.indexing.common.task.Task;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.exceptions.DBIException;
-import org.skife.jdbi.v2.exceptions.StatementException;
-import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
-
-import java.sql.SQLException;
-import java.sql.SQLRecoverableException;
-import java.sql.SQLTransientException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-public class DbTaskStorage implements TaskStorage
-{
- private final ObjectMapper jsonMapper;
- private final DbConnector dbConnector;
- private final DbTablesConfig dbTables;
- private final IDBI dbi;
- private final TaskStorageConfig config;
-
- private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
-
- @Inject
- public DbTaskStorage(
- final ObjectMapper jsonMapper,
- final DbConnector dbConnector,
- final DbTablesConfig dbTables,
- final IDBI dbi,
- final TaskStorageConfig config
- )
- {
- this.jsonMapper = jsonMapper;
- this.dbConnector = dbConnector;
- this.dbTables = dbTables;
- this.dbi = dbi;
- this.config = config;
- }
-
- @LifecycleStart
- public void start()
- {
- dbConnector.createTaskTables();
- }
-
- @LifecycleStop
- public void stop()
- {
- // do nothing
- }
-
- @Override
- public void insert(final Task task, final TaskStatus status) throws TaskExistsException
- {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkNotNull(status, "status");
- Preconditions.checkArgument(
- task.getId().equals(status.getId()),
- "Task/Status ID mismatch[%s/%s]",
- task.getId(),
- status.getId()
- );
-
- log.info("Inserting task %s with status: %s", task.getId(), status);
-
- try {
- retryingHandle(
- new HandleCallback()
- {
- @Override
- public Void withHandle(Handle handle) throws Exception
- {
- handle.createStatement(
- String.format(
- "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
- dbTables.getTasksTable()
- )
- )
- .bind("id", task.getId())
- .bind("created_date", new DateTime().toString())
- .bind("datasource", task.getDataSource())
- .bind("payload", jsonMapper.writeValueAsBytes(task))
- .bind("active", status.isRunnable() ? 1 : 0)
- .bind("status_payload", jsonMapper.writeValueAsBytes(status))
- .execute();
-
- return null;
- }
- }
- );
- }
- catch (Exception e) {
- final boolean isStatementException = e instanceof StatementException ||
- (e instanceof CallbackFailedException
- && e.getCause() instanceof StatementException);
- if (isStatementException && getTask(task.getId()).isPresent()) {
- throw new TaskExistsException(task.getId(), e);
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public void setStatus(final TaskStatus status)
- {
- Preconditions.checkNotNull(status, "status");
-
- log.info("Updating task %s to status: %s", status.getId(), status);
-
- int updated = retryingHandle(
- new HandleCallback()
- {
- @Override
- public Integer withHandle(Handle handle) throws Exception
- {
- return handle.createStatement(
- String.format(
- "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
- dbTables.getTasksTable()
- )
- )
- .bind("id", status.getId())
- .bind("active", status.isRunnable() ? 1 : 0)
- .bind("status_payload", jsonMapper.writeValueAsBytes(status))
- .execute();
- }
- }
- );
-
- if (updated != 1) {
- throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
- }
- }
-
- @Override
- public Optional getTask(final String taskid)
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public Optional withHandle(Handle handle) throws Exception
- {
- final List