From a835db2a3c928af8914a1502f2a500a5a89ca5c2 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Tue, 25 Feb 2014 23:03:13 +0100 Subject: [PATCH] Added support for task storage in PostgreSQL --- .../main/java/io/druid/db/DbConnector.java | 79 +++++++++++++------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 1ed4e279df8..ca6b043146d 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -45,7 +45,7 @@ public class DbConnector segmentTableName, String.format( isPostgreSQL(dbi) ? - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));" + + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + "CREATE INDEX ON %1$s(dataSource);"+ "CREATE INDEX ON %1$s(used);": "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", @@ -61,7 +61,7 @@ public class DbConnector ruleTableName, String.format( isPostgreSQL(dbi) ? - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload TEXT NOT NULL, PRIMARY KEY (id));"+ + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ "CREATE INDEX ON %1$s(dataSource);": "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", ruleTableName @@ -89,16 +89,27 @@ public class DbConnector dbi, taskTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id varchar(255) NOT NULL,\n" + + " created_date TEXT NOT NULL,\n" + + " datasource varchar(255) NOT NULL,\n" + + " payload bytea NOT NULL,\n" + + " status_payload bytea NOT NULL,\n" + + " active SMALLINT NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX ON %1$s(active, created_date);": + "CREATE TABLE `%s` (\n" + + " `id` varchar(255) NOT NULL,\n" + + " `created_date` tinytext NOT NULL,\n" + + " `datasource` varchar(255) NOT NULL,\n" + + " `payload` longblob NOT NULL,\n" + + " `status_payload` longblob NOT NULL,\n" + + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (`id`),\n" + + " KEY (active, created_date(100))\n" + + ")", taskTableName ) ); @@ -110,13 +121,21 @@ public class DbConnector dbi, taskLogsTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " log_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `log_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLogsTableName ) ); @@ -128,13 +147,21 @@ public class DbConnector dbi, taskLocksTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " lock_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `lock_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLocksTableName ) );