simplify SQL metadata storage

This commit is contained in:
Xavier Léauté 2014-10-28 18:00:16 -07:00
parent aa754b86e8
commit 92afd10947
5 changed files with 129 additions and 253 deletions

View File

@ -36,6 +36,9 @@ import java.util.Map;
public class MySQLConnector extends SQLMetadataConnector public class MySQLConnector extends SQLMetadataConnector
{ {
private static final Logger log = new Logger(MySQLConnector.class); private static final Logger log = new Logger(MySQLConnector.class);
private static final String PAYLOAD_TYPE = "LONGBLOB";
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private final DBI dbi; private final DBI dbi;
@Inject @Inject
@ -54,6 +57,18 @@ public class MySQLConnector extends SQLMetadataConnector
}); });
} }
@Override
protected String getPayloadType()
{
return PAYLOAD_TYPE;
}
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override @Override
public void createTable(final IDBI dbi, final String tableName, final String sql) public void createTable(final IDBI dbi, final String tableName, final String sql)
{ {
@ -81,107 +96,6 @@ public class MySQLConnector extends SQLMetadataConnector
} }
} }
@Override
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, "
+ "start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
tableName
)
);
}
@Override
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
tableName
)
);
}
@Override
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
@Override
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
tableName
)
);
}
@Override
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
@Override
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
@Override @Override
public Void insertOrUpdate( public Void insertOrUpdate(
final String tableName, final String tableName,
@ -197,10 +111,14 @@ public class MySQLConnector extends SQLMetadataConnector
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement(String.format( handle.createStatement(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", String.format(
tableName, keyColumn, valueColumn "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
)) tableName,
keyColumn,
valueColumn
)
)
.bind("key", key) .bind("key", key)
.bind("value", value) .bind("value", value)
.execute(); .execute();

View File

@ -36,6 +36,9 @@ import java.util.Map;
public class PostgreSQLConnector extends SQLMetadataConnector public class PostgreSQLConnector extends SQLMetadataConnector
{ {
private static final Logger log = new Logger(PostgreSQLConnector.class); private static final Logger log = new Logger(PostgreSQLConnector.class);
private static final String PAYLOAD_TYPE = "BYTEA";
private static final String SERIAL_TYPE = "BIGSERIAL";
private final DBI dbi; private final DBI dbi;
@Inject @Inject
@ -46,6 +49,17 @@ public class PostgreSQLConnector extends SQLMetadataConnector
} }
@Override
protected String getPayloadType() {
return PAYLOAD_TYPE;
}
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override @Override
public void createTable(final IDBI dbi, final String tableName, final String sql) public void createTable(final IDBI dbi, final String tableName, final String sql)
{ {
@ -73,110 +87,6 @@ public class PostgreSQLConnector extends SQLMetadataConnector
} }
} }
@Override
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, "
+ "start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);",
tableName
)
);
}
@Override
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);",
tableName
)
);
}
@Override
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
@Override
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);",
tableName
)
);
}
@Override
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
@Override
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
@Override @Override
public Void insertOrUpdate( public Void insertOrUpdate(
final String tableName, final String tableName,
@ -192,14 +102,18 @@ public class PostgreSQLConnector extends SQLMetadataConnector
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement(String.format( handle.createStatement(
"BEGIN;\n" + String.format(
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + "BEGIN;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
"COMMIT;", " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
tableName, keyColumn, valueColumn "COMMIT;",
)) tableName,
keyColumn,
valueColumn
)
)
.bind("key", key) .bind("key", key)
.bind("value", value) .bind("value", value)
.execute(); .execute();

View File

@ -22,7 +22,6 @@ package io.druid.db;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import org.apache.derby.drda.NetworkServerControl; import org.apache.derby.drda.NetworkServerControl;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.tweak.ConnectionFactory; import org.skife.jdbi.v2.tweak.ConnectionFactory;
@ -31,6 +30,7 @@ import java.net.InetAddress;
public class DerbyConnector extends SQLMetadataConnector public class DerbyConnector extends SQLMetadataConnector
{ {
private static final String SERIAL_TYPE = "BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)";
private final DBI dbi; private final DBI dbi;
@Inject @Inject
@ -40,6 +40,12 @@ public class DerbyConnector extends SQLMetadataConnector
this.dbi = new DBI(getConnectionFactory("druidDerbyDb")); this.dbi = new DBI(getConnectionFactory("druidDerbyDb"));
} }
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override @Override
public DBI getDBI() { return dbi; } public DBI getDBI() { return dbi; }

View File

@ -28,6 +28,7 @@ import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import java.sql.Blob;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -48,17 +49,14 @@ public class DerbyMetadataRuleManager extends SQLMetadataRuleManager
@Override @Override
protected List<Rule> getRules(Map<String, Object> stringObjectMap) { protected List<Rule> getRules(Map<String, Object> stringObjectMap) {
List<Rule> rules = null;
try { try {
java.sql.Clob payload = (java.sql.Clob)stringObjectMap.get("payload"); Blob payload = (Blob)stringObjectMap.get("payload");
rules = jsonMapper.readValue( List<Rule> rules = jsonMapper.readValue(
payload.getSubString(1, (int)payload.length()), new TypeReference<List<Rule>>() payload.getBinaryStream(), new TypeReference<List<Rule>>() {}
{
}
); );
return rules;
} catch (Exception e) { } catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
return rules;
} }
} }

View File

@ -37,9 +37,11 @@ import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class SQLMetadataConnector implements MetadataStorageConnector public abstract class SQLMetadataConnector implements MetadataStorageConnector
{ {
private static final Logger log = new Logger(SQLMetadataConnector.class); private static final Logger log = new Logger(SQLMetadataConnector.class);
private static final String PAYLOAD_TYPE = "BLOB";
private final Supplier<MetadataStorageConnectorConfig> config; private final Supplier<MetadataStorageConnectorConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables; private final Supplier<MetadataStorageTablesConfig> dbTables;
@ -50,6 +52,12 @@ public class SQLMetadataConnector implements MetadataStorageConnector
this.dbTables = dbTables; this.dbTables = dbTables;
} }
protected String getPayloadType() {
return PAYLOAD_TYPE;
}
protected abstract String getSerialType();
public void createTable(final IDBI dbi, final String tableName, final String sql) public void createTable(final IDBI dbi, final String tableName, final String sql)
{ {
try { try {
@ -99,25 +107,40 @@ public class SQLMetadataConnector implements MetadataStorageConnector
dbi, dbi,
tableName, tableName,
String.format( String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " "CREATE TABLE %1$s (\n"
+ "start VARCHAR(255) NOT NULL, \"end\" VARCHAR(255) NOT NULL, partitioned SMALLINT NOT NULL, version VARCHAR(255) NOT NULL, " + " id VARCHAR(255) NOT NULL,\n"
+ "used BOOLEAN NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", + " dataSource VARCHAR(255) NOT NULL,\n"
tableName + " created_date VARCHAR(255) NOT NULL,\n"
+ " start VARCHAR(255) NOT NULL,\n"
+ " \"end\" VARCHAR(255) NOT NULL,\n"
+ " partitioned BOOLEAN NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"
+ "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);\n"
+ "CREATE INDEX idx_%1$s_used ON %1$s(used);",
tableName, getPayloadType()
) )
); );
createIndex(dbi, tableName, "segment_dataSource", "dataSource");
createIndex(dbi, tableName, "segment_used", "used");
} }
public void createRulesTable(final IDBI dbi, final String tableName) public void createRulesTable(final IDBI dbi, final String tableName)
{ {
System.out.println("creating rule table");
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version VARCHAR(255) NOT NULL, payload CLOB NOT NULL, PRIMARY KEY (id))", "CREATE TABLE %1$s (\n"
tableName + " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"
+ "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);",
tableName, getPayloadType()
) )
); );
createIndex(dbi, tableName, "rules_dataSource", "dataSource"); createIndex(dbi, tableName, "rules_dataSource", "dataSource");
@ -129,8 +152,12 @@ public class SQLMetadataConnector implements MetadataStorageConnector
dbi, dbi,
tableName, tableName,
String.format( String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", "CREATE TABLE %1$s (\n"
tableName + " name VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY(name)\n"
+ ");",
tableName, getPayloadType()
) )
); );
} }
@ -140,13 +167,20 @@ public class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format("CREATE TABLE %s (id VARCHAR(255) NOT NULL, created_date VARCHAR(255) NOT NULL, " String.format(
+ "datasource VARCHAR(255) NOT NULL, payload CLOB NOT NULL, status_payload CLOB NOT NULL, " "CREATE TABLE %1$s (\n"
+ "active SMALLINT NOT NULL DEFAULT 0, PRIMARY KEY (id))", + " id VARCHAR(255) NOT NULL,\n"
tableName + " created_date VARCHAR(255) NOT NULL,\n"
+ " datasource VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " status_payload %2$s NOT NULL,\n"
+ " active BOOLEAN NOT NULL DEFAULT FALSE,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"
+ "CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);",
tableName, getPayloadType()
) )
); );
createIndex(dbi, tableName, "task_active_created_date", "active, created_date");
} }
public void createTaskLogTable(final IDBI dbi, final String tableName) public void createTaskLogTable(final IDBI dbi, final String tableName)
@ -155,13 +189,16 @@ public class SQLMetadataConnector implements MetadataStorageConnector
dbi, dbi,
tableName, tableName,
String.format( String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " "CREATE TABLE %1$s (\n"
+ "task_id VARCHAR(255) DEFAULT NULL, log_payload CLOB, PRIMARY KEY (id)); " + " id %2$s NOT NULL,\n"
+ "CREATE INDEX task_log_task_id ON %1$s(task_id)", + " task_id VARCHAR(255) DEFAULT NULL,\n"
tableName + " log_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"
+ "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);",
tableName, getSerialType(), getPayloadType()
) )
); );
createIndex(dbi, tableName, "task_log_task_id", "task_id");
} }
public void createTaskLockTable(final IDBI dbi, final String tableName) public void createTaskLockTable(final IDBI dbi, final String tableName)
@ -170,12 +207,16 @@ public class SQLMetadataConnector implements MetadataStorageConnector
dbi, dbi,
tableName, tableName,
String.format( String.format(
"CREATE TABLE %s (id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " "CREATE TABLE %1$s (\n"
+ "task_id VARCHAR(255) DEFAULT NULL, lock_payload CLOB, PRIMARY KEY (id))", + " id %2$s NOT NULL,\n"
tableName + " task_id VARCHAR(255) DEFAULT NULL,\n"
+ " lock_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"
+ "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);",
tableName, getSerialType(), getPayloadType()
) )
); );
createIndex(dbi, tableName, "task_lock_task_id", "task_id");
} }
@Override @Override
@ -220,8 +261,7 @@ public class SQLMetadataConnector implements MetadataStorageConnector
); );
} }
/* this method should be overwritten for each type of connector */ public abstract DBI getDBI();
public DBI getDBI() { return null; }
@Override @Override
public void createSegmentTable() { public void createSegmentTable() {