changed SQLMetadataConnector to a concrete class

This commit is contained in:
jisookim0513 2014-10-21 23:38:26 -07:00
parent 37979282fe
commit a0d9944fa6
4 changed files with 245 additions and 227 deletions

View File

@ -25,11 +25,13 @@ import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
@ -54,6 +56,7 @@ public class MySQLConnector extends SQLMetadataConnector
});
}
@Override
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
@ -80,6 +83,7 @@ public class MySQLConnector extends SQLMetadataConnector
}
}
@Override
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
@ -94,6 +98,7 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
@Override
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
@ -106,6 +111,7 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
@Override
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
@ -118,6 +124,7 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
@ -139,6 +146,7 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
@ -157,6 +165,7 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
@ -175,14 +184,34 @@ public class MySQLConnector extends SQLMetadataConnector
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return String.format(
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
))
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
@Override
public DBI getDBI() { return dbi; }
}

View File

@ -25,11 +25,13 @@ import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
@ -46,6 +48,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
}
@Override
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
@ -72,6 +75,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
}
}
@Override
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
@ -88,6 +92,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
@Override
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
@ -101,6 +106,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
@Override
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
@ -113,6 +119,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
@ -134,6 +141,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
@ -152,6 +160,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
@Override
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
@ -170,17 +179,38 @@ public class PostgreSQLConnector extends SQLMetadataConnector
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return String.format(
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format(
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;",
tableName, keyColumn, valueColumn
))
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
@Override
public DBI getDBI() { return dbi; }
}

View File

@ -47,181 +47,7 @@ public class DerbyConnector extends SQLMetadataConnector
this.dbi = new DBI(getConnectionFactory("druidDerbyDb"));
}
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase()));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase()));
if (table.isEmpty()) {
handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute();
}
return null;
}
}
);
}
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, "
+ "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, "
+ "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "segment_dataSource", "dataSource");
createIndex(dbi, tableName, "segment_used", "used");
}
public void createRulesTable(final IDBI dbi, final String tableName)
{
System.out.println("creating rule table");
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "rules_dataSource", "dataSource");
}
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, "
+ "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, "
+ "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "task_active_created_date", "active, created_date");
}
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); "
+ "CREATE INDEX task_log_task_id ON %1$s(task_id)",
tableName
)
);
createIndex(dbi, tableName, "task_log_task_id", "task_id");
}
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "task_lock_task_id", "task_id");
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
{
return null;
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
Connection conn = getDBI().open().getConnection();
handle.begin();
conn.setAutoCommit(false);
List<Map<String, Object>> entry = handle.createQuery(
String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn)
).list();
if (entry == null || entry.isEmpty()) {
handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
} else {
handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
}
conn.setAutoCommit(true);
handle.commit();
return null;
}
}
);
}
public DBI getDBI() { return dbi; }
private ConnectionFactory getConnectionFactory(String dbName)

View File

@ -21,6 +21,7 @@ package io.druid.db;
import com.google.common.base.Supplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
@ -30,39 +31,197 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
public abstract class SQLMetadataConnector implements MetadataStorageConnector
public class SQLMetadataConnector implements MetadataStorageConnector
{
private static final Logger log = new Logger(SQLMetadataConnector.class);
private final Supplier<MetadataStorageConnectorConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables;
protected SQLMetadataConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
public SQLMetadataConnector(Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables)
{
this.config = config;
this.dbTables = dbTables;
}
public abstract void createTable(final IDBI dbi, final String tableName, final String sql);
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase()));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public abstract void createSegmentTable(final IDBI dbi, final String tableName);
public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase()));
if (table.isEmpty()) {
handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute();
}
return null;
}
}
);
}
public abstract void createRulesTable(final IDBI dbi, final String tableName);
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, "
+ "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, "
+ "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "segment_dataSource", "dataSource");
createIndex(dbi, tableName, "segment_used", "used");
}
public abstract void createConfigTable(final IDBI dbi, final String tableName);
public void createRulesTable(final IDBI dbi, final String tableName)
{
System.out.println("creating rule table");
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "rules_dataSource", "dataSource");
}
public abstract void createTaskTable(final IDBI dbi, final String tableName);
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public abstract void createTaskLogTable(final IDBI dbi, final String taskLogsTableName);
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, "
+ "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, "
+ "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "task_active_created_date", "active, created_date");
}
public abstract void createTaskLockTable(final IDBI dbi, final String taskLocksTableName);
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); "
+ "CREATE INDEX task_log_task_id ON %1$s(task_id)",
tableName
)
);
createIndex(dbi, tableName, "task_log_task_id", "task_id");
}
public abstract String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn);
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))",
tableName
)
);
createIndex(dbi, tableName, "task_lock_task_id", "task_id");
}
public abstract DBI getDBI();
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
Connection conn = getDBI().open().getConnection();
handle.begin();
conn.setAutoCommit(false);
List<Map<String, Object>> entry = handle.createQuery(
String.format("SELECT * FROM %1$s WHERE %2$s=:key", tableName, keyColumn)
).list();
if (entry == null || entry.isEmpty()) {
handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
} else {
handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
}
conn.setAutoCommit(true);
handle.commit();
return null;
}
}
);
}
/* this method should be overwritten for each type of connector */
public DBI getDBI() { return null; }
@Override
public void createSegmentTable() {
@ -95,33 +254,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
}
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
final String insertOrUpdateStatement = insertOrUpdateStatement(tableName, keyColumn, valueColumn);
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertOrUpdateStatement)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
@Override
public byte[] lookup(
final String tableName,
@ -171,7 +303,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
protected DataSource getDatasource()
{
MetadataStorageConnectorConfig connectorConfig = config.get();
MetadataStorageConnectorConfig connectorConfig = getConfig();
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
@ -186,4 +318,5 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
return dataSource;
}
}