From 8fac0b0b7ae3fe5456bef0200ddfdb758ae8c01d Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Wed, 12 Feb 2014 11:43:28 +0000 Subject: [PATCH 1/6] Fixed automatic tables creation for PostgreSQL --- .../main/java/io/druid/db/DbConnector.java | 51 ++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 585276ade91..1e9b81be843 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -44,7 +44,11 @@ public class DbConnector dbi, segmentTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));" + + "CREATE INDEX ON %1$s(dataSource);"+ + "CREATE INDEX ON %1$s(used);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", segmentTableName ) ); @@ -56,7 +60,10 @@ public class DbConnector dbi, ruleTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));"+ + "CREATE INDEX ON %1$s(dataSource);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", ruleTableName ) ); @@ -68,7 +75,9 @@ public class DbConnector dbi, configTableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", + isPostgreSQL(dbi) ? + "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))": + "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", configTableName ) ); @@ -144,16 +153,20 @@ public class DbConnector @Override public Void withHandle(Handle handle) throws Exception { - if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } + List> table; + if ( isPostgreSQL(dbi) ) { + table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName)); + } else { + table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); } + + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } + return null; } } @@ -164,6 +177,20 @@ public class DbConnector } } + protected static Boolean isPostgreSQL(final IDBI dbi) + { + return dbi.withHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); + } + } + ); + } + private final Supplier config; private final Supplier dbTables; private final DBI dbi; From d089f05127899c76ece7599adc4dfaacc4842d04 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Wed, 12 Feb 2014 12:27:31 +0000 Subject: [PATCH 2/6] Improved support for PostgreSQL in config management --- .../main/java/io/druid/common/config/ConfigManager.java | 9 ++++++++- common/src/main/java/io/druid/db/DbConnector.java | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java index 172c82a6e1c..308fd392fd3 100644 --- a/common/src/main/java/io/druid/common/config/ConfigManager.java +++ b/common/src/main/java/io/druid/common/config/ConfigManager.java @@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import org.joda.time.Duration; import org.skife.jdbi.v2.Handle; @@ -79,7 +80,13 @@ public class ConfigManager this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable); this.insertStatement = String.format( - "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + DbConnector.isPostgreSQL(dbi) ? + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" + + " INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;" : + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", configTable ); } diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 1e9b81be843..1ed4e279df8 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -177,7 +177,7 @@ public class DbConnector } } - protected static Boolean isPostgreSQL(final IDBI dbi) + public static Boolean isPostgreSQL(final IDBI dbi) { return dbi.withHandle( new HandleCallback() From 4dce0075774fcb1430a5e3dfb5a942d936c3e968 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Wed, 12 Feb 2014 12:30:32 +0000 Subject: [PATCH 3/6] Use DbConnector.isPostgreSQL everywhere we used getDatabaseProductName before --- .../java/io/druid/segment/realtime/DbSegmentPublisher.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index 8ef9dfd193d..272c507cfd2 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -83,15 +84,15 @@ public class DbSegmentPublisher implements SegmentPublisher public Void withHandle(Handle handle) throws Exception { String statement; - if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) { + if (DbConnector.isPostgreSQL(dbi)) { statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", config.getSegmentsTable() ); } else { statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", config.getSegmentsTable() ); From 4b67dbff5f15b820f796f983cf7b27f2dc87d19e Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Mon, 17 Feb 2014 17:54:59 +0000 Subject: [PATCH 4/6] Optimize DbSegmentPublisher.publishSegment --- .../segment/realtime/DbSegmentPublisher.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index 272c507cfd2..b97738aa91a 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -41,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher private final ObjectMapper jsonMapper; private final DbTablesConfig config; private final IDBI dbi; + private final String statement; @Inject public DbSegmentPublisher( @@ -52,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher this.jsonMapper = jsonMapper; this.config = config; this.dbi = dbi; + + if (DbConnector.isPostgreSQL(dbi)) { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } else { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } } public void publishSegment(final DataSegment segment) throws IOException @@ -83,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher @Override public Void withHandle(Handle handle) throws Exception { - String statement; - if (DbConnector.isPostgreSQL(dbi)) { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } else { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } - handle.createStatement(statement) .bind("id", segment.getIdentifier()) .bind("dataSource", segment.getDataSource()) From a835db2a3c928af8914a1502f2a500a5a89ca5c2 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Tue, 25 Feb 2014 23:03:13 +0100 Subject: [PATCH 5/6] Added support for task storage in PostgreSQL --- .../main/java/io/druid/db/DbConnector.java | 79 +++++++++++++------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 1ed4e279df8..ca6b043146d 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -45,7 +45,7 @@ public class DbConnector segmentTableName, String.format( isPostgreSQL(dbi) ? - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));" + + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + "CREATE INDEX ON %1$s(dataSource);"+ "CREATE INDEX ON %1$s(used);": "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", @@ -61,7 +61,7 @@ public class DbConnector ruleTableName, String.format( isPostgreSQL(dbi) ? - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));"+ + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ "CREATE INDEX ON %1$s(dataSource);": "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", ruleTableName @@ -89,16 +89,27 @@ public class DbConnector dbi, taskTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id varchar(255) NOT NULL,\n" + + " created_date TEXT NOT NULL,\n" + + " datasource varchar(255) NOT NULL,\n" + + " payload bytea NOT NULL,\n" + + " status_payload bytea NOT NULL,\n" + + " active SMALLINT NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX ON %1$s(active, created_date);": + "CREATE TABLE `%s` (\n" + + " `id` varchar(255) NOT NULL,\n" + + " `created_date` tinytext NOT NULL,\n" + + " `datasource` varchar(255) NOT NULL,\n" + + " `payload` longblob NOT NULL,\n" + + " `status_payload` longblob NOT NULL,\n" + + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (`id`),\n" + + " KEY (active, created_date(100))\n" + + ")", taskTableName ) ); @@ -110,13 +121,21 @@ public class DbConnector dbi, taskLogsTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " log_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `log_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLogsTableName ) ); @@ -128,13 +147,21 @@ public class DbConnector dbi, taskLocksTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " lock_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `lock_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLocksTableName ) ); From e40725d5f384cb4e6fe81dc0537d9b1e3bf84125 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Wed, 26 Feb 2014 00:47:53 +0100 Subject: [PATCH 6/6] Added support for PostgreSQL on overlord nodes --- .../src/main/java/io/druid/db/DbConnector.java | 8 +++++++- .../overlord/IndexerDBCoordinator.java | 18 +++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index ca6b043146d..02b2bac2b63 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.sql.DataSource; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -212,12 +213,17 @@ public class DbConnector @Override public Boolean withHandle(Handle handle) throws Exception { - return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); + return isPostgreSQL(handle); } } ); } + public static Boolean isPostgreSQL(final Handle handle) throws SQLException + { + return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); + } + private final Supplier config; private final Supplier dbTables; private final DBI dbi; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 7e5f3ef48dd..dc02ee9d4ef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,6 +28,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -179,8 +180,11 @@ public class IndexerDBCoordinator try { handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + DbConnector.isPostgreSQL(handle) ? + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)": + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable() ) ) @@ -196,7 +200,9 @@ public class IndexerDBCoordinator .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); - } catch (Exception e) { + } catch(SQLException e) { + throw new IOException(e); + } catch(Exception e) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); } else { @@ -293,11 +299,13 @@ public class IndexerDBCoordinator new HandleCallback>() { @Override - public List withHandle(Handle handle) throws IOException + public List withHandle(Handle handle) throws IOException, SQLException { return handle.createQuery( String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + DbConnector.isPostgreSQL(handle)? + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0": + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", dbTables.getSegmentsTable() ) )