diff --git a/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java b/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java index 0b7c96d4b0f..cc3274d5acf 100644 --- a/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java +++ b/mysql-storage/src/main/java/io/druid/storage/mysql/MySQLConnector.java @@ -25,11 +25,13 @@ import com.metamx.common.logger.Logger; import io.druid.db.MetadataStorageConnectorConfig; import io.druid.db.MetadataStorageTablesConfig; import io.druid.db.SQLMetadataConnector; +import org.apache.commons.dbcp.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; +import javax.sql.DataSource; import java.util.List; import java.util.Map; @@ -54,6 +56,7 @@ public class MySQLConnector extends SQLMetadataConnector }); } + @Override public void createTable(final IDBI dbi, final String tableName, final String sql) { try { @@ -80,6 +83,7 @@ public class MySQLConnector extends SQLMetadataConnector } } + @Override public void createSegmentTable(final IDBI dbi, final String tableName) { createTable( @@ -94,6 +98,7 @@ public class MySQLConnector extends SQLMetadataConnector ); } + @Override public void createRulesTable(final IDBI dbi, final String tableName) { createTable( @@ -106,6 +111,7 @@ public class MySQLConnector extends SQLMetadataConnector ); } + @Override public void createConfigTable(final IDBI dbi, final String tableName) { createTable( @@ -118,6 +124,7 @@ public class MySQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskTable(final IDBI dbi, final String tableName) { createTable( @@ -139,6 +146,7 @@ public class MySQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskLogTable(final IDBI dbi, final String tableName) { createTable( @@ -157,6 +165,7 @@ public class MySQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskLockTable(final IDBI dbi, final String tableName) { createTable( @@ -175,14 +184,34 @@ public class MySQLConnector extends SQLMetadataConnector ); } - public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn) + @Override + public Void insertOrUpdate( + final String tableName, + final String keyColumn, + final String valueColumn, + final String key, + final byte[] value + ) throws Exception { - return String.format( - "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", - tableName, keyColumn, valueColumn + return getDBI().withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement(String.format( + "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", + tableName, keyColumn, valueColumn + )) + .bind("key", key) + .bind("value", value) + .execute(); + return null; + } + } ); } + @Override public DBI getDBI() { return dbi; } - } diff --git a/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java b/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java index fd36ec172c0..fd688523657 100644 --- a/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java +++ b/postgres-storage/src/main/java/io/druid/storage/postgres/PostgreSQLConnector.java @@ -25,11 +25,13 @@ import com.metamx.common.logger.Logger; import io.druid.db.MetadataStorageConnectorConfig; import io.druid.db.MetadataStorageTablesConfig; import io.druid.db.SQLMetadataConnector; +import org.apache.commons.dbcp.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; +import javax.sql.DataSource; import java.util.List; import java.util.Map; @@ -46,6 +48,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector } + @Override public void createTable(final IDBI dbi, final String tableName, final String sql) { try { @@ -72,6 +75,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector } } + @Override public void createSegmentTable(final IDBI dbi, final String tableName) { createTable( @@ -88,6 +92,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } + @Override public void createRulesTable(final IDBI dbi, final String tableName) { createTable( @@ -101,6 +106,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } + @Override public void createConfigTable(final IDBI dbi, final String tableName) { createTable( @@ -113,6 +119,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskTable(final IDBI dbi, final String tableName) { createTable( @@ -134,6 +141,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskLogTable(final IDBI dbi, final String tableName) { createTable( @@ -152,6 +160,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } + @Override public void createTaskLockTable(final IDBI dbi, final String tableName) { createTable( @@ -170,17 +179,38 @@ public class PostgreSQLConnector extends SQLMetadataConnector ); } - public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn) + @Override + public Void insertOrUpdate( + final String tableName, + final String keyColumn, + final String valueColumn, + final String key, + final byte[] value + ) throws Exception { - return String.format( - "BEGIN;\n" + - "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + - "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + - " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + - "COMMIT;", - tableName, keyColumn, valueColumn + return getDBI().withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement(String.format( + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + + " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;", + tableName, keyColumn, valueColumn + )) + .bind("key", key) + .bind("value", value) + .execute(); + return null; + } + } ); } + @Override public DBI getDBI() { return dbi; } } diff --git a/server/src/main/java/io/druid/db/DerbyConnector.java b/server/src/main/java/io/druid/db/DerbyConnector.java index 8079cce9b32..47c50382f31 100644 --- a/server/src/main/java/io/druid/db/DerbyConnector.java +++ b/server/src/main/java/io/druid/db/DerbyConnector.java @@ -47,181 +47,7 @@ public class DerbyConnector extends SQLMetadataConnector this.dbi = new DBI(getConnectionFactory("druidDerbyDb")); } - public void createTable(final IDBI dbi, final String tableName, final String sql) - { - try { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - List> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase())); - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } - return null; - } - } - ); - } - catch (Exception e) { - log.warn(e, "Exception creating table"); - } - } - - public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - List> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase())); - if (table.isEmpty()) { - handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute(); - } - return null; - } - } - ); - } - - public void createSegmentTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " - + "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, " - + "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", - tableName - ) - ); - createIndex(dbi, tableName, "segment_dataSource", "dataSource"); - createIndex(dbi, tableName, "segment_used", "used"); - } - - public void createRulesTable(final IDBI dbi, final String tableName) - { - System.out.println("creating rule table"); - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", - tableName - ) - ); - createIndex(dbi, tableName, "rules_dataSource", "dataSource"); - } - - public void createConfigTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", - tableName - ) - ); - } - - public void createTaskTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " - + "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, " - + "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))", - tableName - ) - ); - createIndex(dbi, tableName, "task_active_created_date", "active, created_date"); - } - - public void createTaskLogTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " - + "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); " - + "CREATE INDEX task_log_task_id ON %1$s(task_id)", - tableName - ) - ); - createIndex(dbi, tableName, "task_log_task_id", "task_id"); - } - - public void createTaskLockTable(final IDBI dbi, final String tableName) - { - createTable( - dbi, - tableName, - String.format( - "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " - + "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))", - tableName - ) - ); - createIndex(dbi, tableName, "task_lock_task_id", "task_id"); - } - - public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn) - { - return null; - } - @Override - public Void insertOrUpdate( - final String tableName, - final String keyColumn, - final String valueColumn, - final String key, - final byte[] value - ) throws Exception - { - return getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - Connection conn = getDBI().open().getConnection(); - handle.begin(); - conn.setAutoCommit(false); - List> entry = handle.createQuery( - String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn) - ).list(); - if (entry == null || entry.isEmpty()) { - handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)", - tableName, keyColumn, valueColumn)) - .bind("key", key) - .bind("value", value) - .execute(); - } else { - handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key", - tableName, keyColumn, valueColumn)) - .bind("key", key) - .bind("value", value) - .execute(); - } - conn.setAutoCommit(true); - handle.commit(); - return null; - } - } - ); - } - public DBI getDBI() { return dbi; } private ConnectionFactory getConnectionFactory(String dbName) diff --git a/server/src/main/java/io/druid/db/SQLMetadataConnector.java b/server/src/main/java/io/druid/db/SQLMetadataConnector.java index 5189b498ebb..528414eb802 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/db/SQLMetadataConnector.java @@ -21,6 +21,7 @@ package io.druid.db; import com.google.common.base.Supplier; import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; import org.apache.commons.dbcp.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; @@ -30,39 +31,197 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import javax.sql.DataSource; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.Map; -public abstract class SQLMetadataConnector implements MetadataStorageConnector +public class SQLMetadataConnector implements MetadataStorageConnector { + private static final Logger log = new Logger(SQLMetadataConnector.class); private final Supplier config; private final Supplier dbTables; - protected SQLMetadataConnector(Supplier config, Supplier dbTables) + public SQLMetadataConnector(Supplier config, + Supplier dbTables) { this.config = config; this.dbTables = dbTables; - } - public abstract void createTable(final IDBI dbi, final String tableName, final String sql); + public void createTable(final IDBI dbi, final String tableName, final String sql) + { + try { + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + List> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase())); + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } + return null; + } + } + ); + } + catch (Exception e) { + log.warn(e, "Exception creating table"); + } + } - public abstract void createSegmentTable(final IDBI dbi, final String tableName); + public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) { + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + List> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase())); + if (table.isEmpty()) { + handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute(); + } + return null; + } + } + ); + } - public abstract void createRulesTable(final IDBI dbi, final String tableName); + public void createSegmentTable(final IDBI dbi, final String tableName) + { + createTable( + dbi, + tableName, + String.format( + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " + + "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, " + + "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", + tableName + ) + ); + createIndex(dbi, tableName, "segment_dataSource", "dataSource"); + createIndex(dbi, tableName, "segment_used", "used"); + } - public abstract void createConfigTable(final IDBI dbi, final String tableName); + public void createRulesTable(final IDBI dbi, final String tableName) + { + System.out.println("creating rule table"); + createTable( + dbi, + tableName, + String.format( + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", + tableName + ) + ); + createIndex(dbi, tableName, "rules_dataSource", "dataSource"); + } - public abstract void createTaskTable(final IDBI dbi, final String tableName); + public void createConfigTable(final IDBI dbi, final String tableName) + { + createTable( + dbi, + tableName, + String.format( + "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", + tableName + ) + ); + } - public abstract void createTaskLogTable(final IDBI dbi, final String taskLogsTableName); + public void createTaskTable(final IDBI dbi, final String tableName) + { + createTable( + dbi, + tableName, + String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " + + "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, " + + "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))", + tableName + ) + ); + createIndex(dbi, tableName, "task_active_created_date", "active, created_date"); + } - public abstract void createTaskLockTable(final IDBI dbi, final String taskLocksTableName); + public void createTaskLogTable(final IDBI dbi, final String tableName) + { + createTable( + dbi, + tableName, + String.format( + "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); " + + "CREATE INDEX task_log_task_id ON %1$s(task_id)", + tableName + ) + ); + createIndex(dbi, tableName, "task_log_task_id", "task_id"); + } - public abstract String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn); + public void createTaskLockTable(final IDBI dbi, final String tableName) + { + createTable( + dbi, + tableName, + String.format( + "CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))", + tableName + ) + ); + createIndex(dbi, tableName, "task_lock_task_id", "task_id"); + } - public abstract DBI getDBI(); + @Override + public Void insertOrUpdate( + final String tableName, + final String keyColumn, + final String valueColumn, + final String key, + final byte[] value + ) throws Exception + { + return getDBI().withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + Connection conn = getDBI().open().getConnection(); + handle.begin(); + conn.setAutoCommit(false); + List> entry = handle.createQuery( + String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn) + ).list(); + if (entry == null || entry.isEmpty()) { + handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)", + tableName, keyColumn, valueColumn)) + .bind("key", key) + .bind("value", value) + .execute(); + } else { + handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key", + tableName, keyColumn, valueColumn)) + .bind("key", key) + .bind("value", value) + .execute(); + } + conn.setAutoCommit(true); + handle.commit(); + return null; + } + } + ); + } + + /* this method should be overwritten for each type of connector */ + public DBI getDBI() { return null; } @Override public void createSegmentTable() { @@ -95,33 +254,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector } } - @Override - public Void insertOrUpdate( - final String tableName, - final String keyColumn, - final String valueColumn, - final String key, - final byte[] value - ) throws Exception - { - final String insertOrUpdateStatement = insertOrUpdateStatement(tableName, keyColumn, valueColumn); - - return getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement(insertOrUpdateStatement) - .bind("key", key) - .bind("value", value) - .execute(); - return null; - } - } - ); - } - @Override public byte[] lookup( final String tableName, @@ -171,7 +303,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector protected DataSource getDatasource() { - MetadataStorageConnectorConfig connectorConfig = config.get(); + MetadataStorageConnectorConfig connectorConfig = getConfig(); BasicDataSource dataSource = new BasicDataSource(); dataSource.setUsername(connectorConfig.getUser()); @@ -186,4 +318,5 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector return dataSource; } + }