Merge pull request #400 from tucksaun/feat-postgresql-support

Improving PostgreSQL support
This commit is contained in:
fjy 2014-02-26 09:57:50 -07:00
commit 3d850aa260
4 changed files with 133 additions and 57 deletions

View File

@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -79,7 +80,13 @@ public class ConfigManager
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable); this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
this.insertStatement = String.format( this.insertStatement = String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", DbConnector.isPostgreSQL(dbi) ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
configTable configTable
); );
} }

View File

@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -44,7 +45,11 @@ public class DbConnector
dbi, dbi,
segmentTableName, segmentTableName,
String.format( String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName segmentTableName
) )
); );
@ -56,7 +61,10 @@ public class DbConnector
dbi, dbi,
ruleTableName, ruleTableName,
String.format( String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName ruleTableName
) )
); );
@ -68,7 +76,9 @@ public class DbConnector
dbi, dbi,
configTableName, configTableName,
String.format( String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", isPostgreSQL(dbi) ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName configTableName
) )
); );
@ -80,16 +90,27 @@ public class DbConnector
dbi, dbi,
taskTableName, taskTableName,
String.format( String.format(
"CREATE TABLE `%s` (\n" isPostgreSQL(dbi) ?
+ " `id` varchar(255) NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " `created_date` tinytext NOT NULL,\n" + " id varchar(255) NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n" + " created_date TEXT NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n" + " datasource varchar(255) NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n" + " payload bytea NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + " status_payload bytea NOT NULL,\n"
+ " PRIMARY KEY (`id`),\n" + " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " KEY (active, created_date(100))\n" + " PRIMARY KEY (id)\n"
+ ")", + ");\n" +
"CREATE INDEX ON %1$s(active, created_date);":
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName taskTableName
) )
); );
@ -101,13 +122,21 @@ public class DbConnector
dbi, dbi,
taskLogsTableName, taskLogsTableName,
String.format( String.format(
"CREATE TABLE `%s` (\n" isPostgreSQL(dbi) ?
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" "CREATE TABLE %1$s (\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n" + " id bigserial NOT NULL,\n"
+ " `log_payload` longblob,\n" + " task_id varchar(255) DEFAULT NULL,\n"
+ " PRIMARY KEY (`id`),\n" + " log_payload bytea,\n"
+ " KEY `task_id` (`task_id`)\n" + " PRIMARY KEY (id)\n"
+ ")", + ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName taskLogsTableName
) )
); );
@ -119,13 +148,21 @@ public class DbConnector
dbi, dbi,
taskLocksTableName, taskLocksTableName,
String.format( String.format(
"CREATE TABLE `%s` (\n" isPostgreSQL(dbi) ?
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" "CREATE TABLE %1$s (\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n" + " id bigserial NOT NULL,\n"
+ " `lock_payload` longblob,\n" + " task_id varchar(255) DEFAULT NULL,\n"
+ " PRIMARY KEY (`id`),\n" + " lock_payload bytea,\n"
+ " KEY `task_id` (`task_id`)\n" + " PRIMARY KEY (id)\n"
+ ")", + ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName taskLocksTableName
) )
); );
@ -144,16 +181,20 @@ public class DbConnector
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { List<Map<String, Object>> table;
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); if ( isPostgreSQL(dbi) ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
if (table.isEmpty()) { } else {
log.info("Creating table[%s]", tableName); table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
} }
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null; return null;
} }
} }
@ -164,6 +205,25 @@ public class DbConnector
} }
} }
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return isPostgreSQL(handle);
}
}
);
}
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config; private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables; private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi; private final DBI dbi;

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
@ -179,8 +180,11 @@ public class IndexerDBCoordinator
try { try {
handle.createStatement( handle.createStatement(
String.format( String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " DbConnector.isPostgreSQL(handle) ?
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable() dbTables.getSegmentsTable()
) )
) )
@ -196,7 +200,9 @@ public class IndexerDBCoordinator
.execute(); .execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) { } catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else { } else {
@ -293,11 +299,13 @@ public class IndexerDBCoordinator
new HandleCallback<List<DataSegment>>() new HandleCallback<List<DataSegment>>()
{ {
@Override @Override
public List<DataSegment> withHandle(Handle handle) throws IOException public List<DataSegment> withHandle(Handle handle) throws IOException, SQLException
{ {
return handle.createQuery( return handle.createQuery(
String.format( String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbTables.getSegmentsTable() dbTables.getSegmentsTable()
) )
) )

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final DbTablesConfig config; private final DbTablesConfig config;
private final IDBI dbi; private final IDBI dbi;
private final String statement;
@Inject @Inject
public DbSegmentPublisher( public DbSegmentPublisher(
@ -51,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.config = config; this.config = config;
this.dbi = dbi; this.dbi = dbi;
if (DbConnector.isPostgreSQL(dbi)) {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
} }
public void publishSegment(final DataSegment segment) throws IOException public void publishSegment(final DataSegment segment) throws IOException
@ -82,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
String statement;
if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
handle.createStatement(statement) handle.createStatement(statement)
.bind("id", segment.getIdentifier()) .bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource()) .bind("dataSource", segment.getDataSource())