diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java index 172c82a6e1c..308fd392fd3 100644 --- a/common/src/main/java/io/druid/common/config/ConfigManager.java +++ b/common/src/main/java/io/druid/common/config/ConfigManager.java @@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import org.joda.time.Duration; import org.skife.jdbi.v2.Handle; @@ -79,7 +80,13 @@ public class ConfigManager this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable); this.insertStatement = String.format( - "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + DbConnector.isPostgreSQL(dbi) ? + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" + + " INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;" : + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", configTable ); } diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 585276ade91..02b2bac2b63 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.sql.DataSource; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -44,7 +45,11 @@ public class DbConnector dbi, segmentTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + + "CREATE INDEX ON %1$s(dataSource);"+ + "CREATE INDEX ON %1$s(used);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", segmentTableName ) ); @@ -56,7 +61,10 @@ public class DbConnector dbi, ruleTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ + "CREATE INDEX ON %1$s(dataSource);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", ruleTableName ) ); @@ -68,7 +76,9 @@ public class DbConnector dbi, configTableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", + isPostgreSQL(dbi) ? + "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))": + "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", configTableName ) ); @@ -80,16 +90,27 @@ public class DbConnector dbi, taskTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id varchar(255) NOT NULL,\n" + + " created_date TEXT NOT NULL,\n" + + " datasource varchar(255) NOT NULL,\n" + + " payload bytea NOT NULL,\n" + + " status_payload bytea NOT NULL,\n" + + " active SMALLINT NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX ON %1$s(active, created_date);": + "CREATE TABLE `%s` (\n" + + " `id` varchar(255) NOT NULL,\n" + + " `created_date` tinytext NOT NULL,\n" + + " `datasource` varchar(255) NOT NULL,\n" + + " `payload` longblob NOT NULL,\n" + + " `status_payload` longblob NOT NULL,\n" + + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (`id`),\n" + + " KEY (active, created_date(100))\n" + + ")", taskTableName ) ); @@ -101,13 +122,21 @@ public class DbConnector dbi, taskLogsTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " log_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `log_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLogsTableName ) ); @@ -119,13 +148,21 @@ public class DbConnector dbi, taskLocksTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " lock_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `lock_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLocksTableName ) ); @@ -144,16 +181,20 @@ public class DbConnector @Override public Void withHandle(Handle handle) throws Exception { - if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } + List> table; + if ( isPostgreSQL(dbi) ) { + table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName)); + } else { + table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); } + + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } + return null; } } @@ -164,6 +205,25 @@ public class DbConnector } } + public static Boolean isPostgreSQL(final IDBI dbi) + { + return dbi.withHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return isPostgreSQL(handle); + } + } + ); + } + + public static Boolean isPostgreSQL(final Handle handle) throws SQLException + { + return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); + } + private final Supplier config; private final Supplier dbTables; private final DBI dbi; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 7e5f3ef48dd..dc02ee9d4ef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,6 +28,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -179,8 +180,11 @@ public class IndexerDBCoordinator try { handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + DbConnector.isPostgreSQL(handle) ? + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)": + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable() ) ) @@ -196,7 +200,9 @@ public class IndexerDBCoordinator .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); - } catch (Exception e) { + } catch(SQLException e) { + throw new IOException(e); + } catch(Exception e) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); } else { @@ -293,11 +299,13 @@ public class IndexerDBCoordinator new HandleCallback>() { @Override - public List withHandle(Handle handle) throws IOException + public List withHandle(Handle handle) throws IOException, SQLException { return handle.createQuery( String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + DbConnector.isPostgreSQL(handle)? + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0": + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", dbTables.getSegmentsTable() ) ) diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index 8ef9dfd193d..b97738aa91a 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher private final ObjectMapper jsonMapper; private final DbTablesConfig config; private final IDBI dbi; + private final String statement; @Inject public DbSegmentPublisher( @@ -51,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher this.jsonMapper = jsonMapper; this.config = config; this.dbi = dbi; + + if (DbConnector.isPostgreSQL(dbi)) { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } else { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } } public void publishSegment(final DataSegment segment) throws IOException @@ -82,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher @Override public Void withHandle(Handle handle) throws Exception { - String statement; - if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } else { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } - handle.createStatement(statement) .bind("id", segment.getIdentifier()) .bind("dataSource", segment.getDataSource())