diff --git a/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java b/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java index fee4c7b41ed..6bcb34d0d89 100644 --- a/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java +++ b/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java @@ -36,6 +36,9 @@ import java.util.Map; public class MySQLConnector extends SQLMetadataConnector { private static final Logger log = new Logger(MySQLConnector.class); + private static final String PAYLOAD_TYPE = "LONGBLOB"; + private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT"; + private final DBI dbi; @Inject @@ -54,6 +57,18 @@ public class MySQLConnector extends SQLMetadataConnector }); } + @Override + protected String getPayloadType() + { + return PAYLOAD_TYPE; + } + + @Override + protected String getSerialType() + { + return SERIAL_TYPE; + } + @Override public void createTable(final IDBI dbi, final String tableName, final String sql) { @@ -81,107 +96,6 @@ public class MySQLConnector extends SQLMetadataConnector } } - @Override - public void createSegmentTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, " - + "start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, " - + "used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", - tableName - ) - ); - } - - @Override - public void createRulesTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", - tableName - ) - ); - } - - @Override - public void createConfigTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", - tableName - ) - ); - } - - @Override - public void createTaskTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", - tableName - ) - ); - } - - @Override - public void createTaskLogTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", - tableName - ) - ); - } - - @Override - public void createTaskLockTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", - tableName - ) - ); - } - @Override public Void insertOrUpdate( final String tableName, @@ -197,10 +111,14 @@ public class MySQLConnector extends SQLMetadataConnector @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement(String.format( - "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", - tableName, keyColumn, valueColumn - )) + handle.createStatement( + String.format( + "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", + tableName, + keyColumn, + valueColumn + ) + ) .bind("key", key) .bind("value", value) .execute(); diff --git a/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java b/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java index b52e4d7ac44..3778e64bfb8 100644 --- a/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java +++ b/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java @@ -36,6 +36,9 @@ import java.util.Map; public class PostgreSQLConnector extends SQLMetadataConnector { private static final Logger log = new Logger(PostgreSQLConnector.class); + private static final String PAYLOAD_TYPE = "BYTEA"; + private static final String SERIAL_TYPE = "BIGSERIAL"; + private final DBI dbi; @Inject @@ -46,6 +49,17 @@ public class PostgreSQLConnector extends SQLMetadataConnector } + @Override + protected String getPayloadType() { + return PAYLOAD_TYPE; + } + + @Override + protected String getSerialType() + { + return SERIAL_TYPE; + } + @Override public void createTable(final IDBI dbi, final String tableName, final String sql) { @@ -73,110 +87,6 @@ public class PostgreSQLConnector extends SQLMetadataConnector } } - @Override - public void createSegmentTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, " - + "start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, " - + "used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + - "CREATE INDEX ON %1$s(dataSource);"+ - "CREATE INDEX ON %1$s(used);", - tableName - ) - ); - } - - @Override - public void createRulesTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ - "CREATE INDEX ON %1$s(dataSource);", - tableName - ) - ); - } - - @Override - public void createConfigTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))", - tableName - ) - ); - } - - @Override - public void createTaskTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id varchar(255) NOT NULL,\n" - + " created_date TEXT NOT NULL,\n" - + " datasource varchar(255) NOT NULL,\n" - + " payload bytea NOT NULL,\n" - + " status_payload bytea NOT NULL,\n" - + " active SMALLINT NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (id)\n" - + ");\n" + - "CREATE INDEX ON %1$s(active, created_date);", - tableName - ) - ); - } - - @Override - public void createTaskLogTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id bigserial NOT NULL,\n" - + " task_id varchar(255) DEFAULT NULL,\n" - + " log_payload bytea,\n" - + " PRIMARY KEY (id)\n" - + ");\n"+ - "CREATE INDEX ON %1$s(task_id);", - tableName - ) - ); - } - - @Override - public void createTaskLockTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id bigserial NOT NULL,\n" - + " task_id varchar(255) DEFAULT NULL,\n" - + " lock_payload bytea,\n" - + " PRIMARY KEY (id)\n" - + ");\n"+ - "CREATE INDEX ON %1$s(task_id);", - tableName - ) - ); - } - @Override public Void insertOrUpdate( final String tableName, @@ -192,14 +102,18 @@ public class PostgreSQLConnector extends SQLMetadataConnector @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement(String.format( - "BEGIN;\n" + - "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + - "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + - " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + - "COMMIT;", - tableName, keyColumn, valueColumn - )) + handle.createStatement( + String.format( + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + + " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;", + tableName, + keyColumn, + valueColumn + ) + ) .bind("key", key) .bind("value", value) .execute(); diff --git a/server/src/main/java/io/druid/db/DerbyConnector.java b/server/src/main/java/io/druid/db/DerbyConnector.java index 29586c93db5..88f71d4d666 100644 --- a/server/src/main/java/io/druid/db/DerbyConnector.java +++ b/server/src/main/java/io/druid/db/DerbyConnector.java @@ -22,7 +22,6 @@ package io.druid.db; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Supplier; import com.google.inject.Inject; -import com.metamx.common.logger.Logger; import org.apache.derby.drda.NetworkServerControl; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.tweak.ConnectionFactory; @@ -31,6 +30,7 @@ import java.net.InetAddress; public class DerbyConnector extends SQLMetadataConnector { + private static final String SERIAL_TYPE = "BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)"; private final DBI dbi; @Inject @@ -40,6 +40,12 @@ public class DerbyConnector extends SQLMetadataConnector this.dbi = new DBI(getConnectionFactory("druidDerbyDb")); } + @Override + protected String getSerialType() + { + return SERIAL_TYPE; + } + @Override public DBI getDBI() { return dbi; } diff --git a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java b/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java index e8ba102fc2a..a6863991c53 100644 --- a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java @@ -28,6 +28,7 @@ import io.druid.guice.annotations.Json; import io.druid.server.coordinator.rules.Rule; import org.skife.jdbi.v2.IDBI; +import java.sql.Blob; import java.util.List; import java.util.Map; @@ -48,17 +49,14 @@ public class DerbyMetadataRuleManager extends SQLMetadataRuleManager @Override protected List getRules(Map stringObjectMap) { - List rules = null; try { - java.sql.Clob payload = (java.sql.Clob)stringObjectMap.get("payload"); - rules = jsonMapper.readValue( - payload.getSubString(1, (int)payload.length()), new TypeReference>() - { - } + Blob payload = (Blob)stringObjectMap.get("payload"); + List rules = jsonMapper.readValue( + payload.getBinaryStream(), new TypeReference>() {} ); + return rules; } catch (Exception e) { throw Throwables.propagate(e); } - return rules; } } diff --git a/server/src/main/java/io/druid/db/SQLMetadataConnector.java b/server/src/main/java/io/druid/db/SQLMetadataConnector.java index 528414eb802..f3bf2ae6cd8 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/db/SQLMetadataConnector.java @@ -37,9 +37,11 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; -public class SQLMetadataConnector implements MetadataStorageConnector +public abstract class SQLMetadataConnector implements MetadataStorageConnector { private static final Logger log = new Logger(SQLMetadataConnector.class); + private static final String PAYLOAD_TYPE = "BLOB"; + private final Supplier config; private final Supplier dbTables; @@ -50,6 +52,12 @@ public class SQLMetadataConnector implements MetadataStorageConnector this.dbTables = dbTables; } + protected String getPayloadType() { + return PAYLOAD_TYPE; + } + + protected abstract String getSerialType(); + public void createTable(final IDBI dbi, final String tableName, final String sql) { try { @@ -99,25 +107,40 @@ public class SQLMetadataConnector implements MetadataStorageConnector dbi, tableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " - + "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, " - + "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", - tableName + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " start VARCHAR(255) NOT NULL,\n" + + " \"end\" VARCHAR(255) NOT NULL,\n" + + " partitioned BOOLEAN NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" + + " used BOOLEAN NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);\n" + + "CREATE INDEX idx_%1$s_used ON %1$s(used);", + tableName, getPayloadType() ) ); - createIndex(dbi, tableName, "segment_dataSource", "dataSource"); - createIndex(dbi, tableName, "segment_used", "used"); } public void createRulesTable(final IDBI dbi, final String tableName) { - System.out.println("creating rule table"); createTable( dbi, tableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", - tableName + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", + tableName, getPayloadType() ) ); createIndex(dbi, tableName, "rules_dataSource", "dataSource"); @@ -129,8 +152,12 @@ public class SQLMetadataConnector implements MetadataStorageConnector dbi, tableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", - tableName + "CREATE TABLE %1$s (\n" + + " name VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY(name)\n" + + ");", + tableName, getPayloadType() ) ); } @@ -140,13 +167,20 @@ public class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " - + "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, " - + "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))", - tableName + String.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " datasource VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " status_payload %2$s NOT NULL,\n" + + " active BOOLEAN NOT NULL DEFAULT FALSE,\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);", + tableName, getPayloadType() ) ); - createIndex(dbi, tableName, "task_active_created_date", "active, created_date"); } public void createTaskLogTable(final IDBI dbi, final String tableName) @@ -155,13 +189,16 @@ public class SQLMetadataConnector implements MetadataStorageConnector dbi, tableName, String.format( - "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " - + "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); " - + "CREATE INDEX task_log_task_id ON %1$s(task_id)", - tableName + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " task_id VARCHAR(255) DEFAULT NULL,\n" + + " log_payload %3$s,\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", + tableName, getSerialType(), getPayloadType() ) ); - createIndex(dbi, tableName, "task_log_task_id", "task_id"); } public void createTaskLockTable(final IDBI dbi, final String tableName) @@ -170,12 +207,16 @@ public class SQLMetadataConnector implements MetadataStorageConnector dbi, tableName, String.format( - "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " - + "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))", - tableName + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " task_id VARCHAR(255) DEFAULT NULL,\n" + + " lock_payload %3$s,\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", + tableName, getSerialType(), getPayloadType() ) ); - createIndex(dbi, tableName, "task_lock_task_id", "task_id"); } @Override @@ -220,8 +261,7 @@ public class SQLMetadataConnector implements MetadataStorageConnector ); } - /* this method should be overwritten for each type of connector */ - public DBI getDBI() { return null; } + public abstract DBI getDBI(); @Override public void createSegmentTable() {