workaround for Derby requiring batch statements

+ make sure payloads are always written as bytes
This commit is contained in:
Xavier Léauté 2014-10-29 13:59:26 -07:00
parent 55d5f1f618
commit 1f171a2b86
4 changed files with 89 additions and 89 deletions

View File

@ -191,7 +191,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.bind("version", segment.getVersion()) .bind("version", segment.getVersion())
.bind("used", true) .bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute(); .execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
@ -277,7 +277,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
) )
.bind("id", segment.getIdentifier()) .bind("id", segment.getIdentifier())
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute(); .execute();
} }
catch (IOException e) { catch (IOException e) {

View File

@ -20,9 +20,11 @@
package io.druid.db; package io.druid.db;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
@ -75,7 +77,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
protected abstract boolean tableExists(Handle handle, final String tableName); protected abstract boolean tableExists(Handle handle, final String tableName);
public void createTable(final IDBI dbi, final String tableName, final String sql) public void createTable(final IDBI dbi, final String tableName, final List<String> sql)
{ {
try { try {
dbi.withHandle( dbi.withHandle(
@ -86,7 +88,11 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
{ {
if (!tableExists(handle, tableName)) { if (!tableExists(handle, tableName)) {
log.info("Creating table[%s]", tableName); log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute(); final Batch batch = handle.createBatch();
for(String s : sql) {
batch.add(s);
}
batch.execute();
} else { } else {
log.info("Table[%s] already exists", tableName); log.info("Table[%s] already exists", tableName);
} }
@ -100,44 +106,29 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
} }
} }
public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase()));
if (table.isEmpty()) {
handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute();
}
return null;
}
}
);
}
public void createSegmentTable(final IDBI dbi, final String tableName) public void createSegmentTable(final IDBI dbi, final String tableName)
{ {
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " id VARCHAR(255) NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " dataSource VARCHAR(255) NOT NULL,\n" + " id VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n" + " dataSource VARCHAR(255) NOT NULL,\n"
+ " start VARCHAR(255) NOT NULL,\n" + " created_date VARCHAR(255) NOT NULL,\n"
+ " \"end\" VARCHAR(255) NOT NULL,\n" + " start VARCHAR(255) NOT NULL,\n"
+ " partitioned BOOLEAN NOT NULL,\n" + " \"end\" VARCHAR(255) NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n" + " partitioned BOOLEAN NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n" + " version VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n" + " used BOOLEAN NOT NULL,\n"
+ " PRIMARY KEY (id)\n" + " payload %2$s NOT NULL,\n"
+ ");\n" + " PRIMARY KEY (id)\n"
+ "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);\n" + ")",
+ "CREATE INDEX idx_%1$s_used ON %1$s(used);", tableName, getPayloadType()
tableName, getPayloadType() ),
String.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", tableName),
String.format("CREATE INDEX idx_%1$s_used ON %1$s(used);", tableName)
) )
); );
} }
@ -147,19 +138,20 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " id VARCHAR(255) NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " dataSource VARCHAR(255) NOT NULL,\n" + " id VARCHAR(255) NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n" + " dataSource VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n" + " version VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n" + " payload %2$s NOT NULL,\n"
+ ");\n" + " PRIMARY KEY (id)\n"
+ "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", + ")",
tableName, getPayloadType() tableName, getPayloadType()
),
String.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", tableName)
) )
); );
createIndex(dbi, tableName, "rules_dataSource", "dataSource");
} }
public void createConfigTable(final IDBI dbi, final String tableName) public void createConfigTable(final IDBI dbi, final String tableName)
@ -167,13 +159,15 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " name VARCHAR(255) NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " payload %2$s NOT NULL,\n" + " name VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY(name)\n" + " payload %2$s NOT NULL,\n"
+ ");", + " PRIMARY KEY(name)\n"
tableName, getPayloadType() + ")",
tableName, getPayloadType()
)
) )
); );
} }
@ -183,18 +177,20 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " id VARCHAR(255) NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " created_date VARCHAR(255) NOT NULL,\n" + " id VARCHAR(255) NOT NULL,\n"
+ " datasource VARCHAR(255) NOT NULL,\n" + " created_date VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n" + " datasource VARCHAR(255) NOT NULL,\n"
+ " status_payload %2$s NOT NULL,\n" + " payload %2$s NOT NULL,\n"
+ " active BOOLEAN NOT NULL DEFAULT FALSE,\n" + " status_payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id)\n" + " active BOOLEAN NOT NULL DEFAULT FALSE,\n"
+ ");\n" + " PRIMARY KEY (id)\n"
+ "CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);", + ")",
tableName, getPayloadType() tableName, getPayloadType()
),
String.format("CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);", tableName)
) )
); );
} }
@ -204,15 +200,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " id %2$s NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " task_id VARCHAR(255) DEFAULT NULL,\n" + " id %2$s NOT NULL,\n"
+ " log_payload %3$s,\n" + " task_id VARCHAR(255) DEFAULT NULL,\n"
+ " PRIMARY KEY (id)\n" + " log_payload %3$s,\n"
+ ");\n" + " PRIMARY KEY (id)\n"
+ "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", + ")",
tableName, getSerialType(), getPayloadType() tableName, getSerialType(), getPayloadType()
),
String.format("CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", tableName)
) )
); );
} }
@ -222,15 +220,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createTable( createTable(
dbi, dbi,
tableName, tableName,
String.format( ImmutableList.of(
"CREATE TABLE %1$s (\n" String.format(
+ " id %2$s NOT NULL,\n" "CREATE TABLE %1$s (\n"
+ " task_id VARCHAR(255) DEFAULT NULL,\n" + " id %2$s NOT NULL,\n"
+ " lock_payload %3$s,\n" + " task_id VARCHAR(255) DEFAULT NULL,\n"
+ " PRIMARY KEY (id)\n" + " lock_payload %3$s,\n"
+ ");\n" + " PRIMARY KEY (id)\n"
+ "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", + ")",
tableName, getSerialType(), getPayloadType() tableName, getSerialType(), getPayloadType()
),
String.format("CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", tableName)
) )
); );
} }

View File

@ -109,7 +109,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
.bind("id", String.format("%s_%s", defaultDatasourceName, version)) .bind("id", String.format("%s_%s", defaultDatasourceName, version))
.bind("dataSource", defaultDatasourceName) .bind("dataSource", defaultDatasourceName)
.bind("version", version) .bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(defaultRules)) .bind("payload", jsonMapper.writeValueAsBytes(defaultRules))
.execute(); .execute();
return null; return null;
@ -320,7 +320,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
.bind("id", String.format("%s_%s", dataSource, version)) .bind("id", String.format("%s_%s", dataSource, version))
.bind("dataSource", dataSource) .bind("dataSource", dataSource)
.bind("version", version) .bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(newRules)) .bind("payload", jsonMapper.writeValueAsBytes(newRules))
.execute(); .execute();
return null; return null;

View File

@ -101,7 +101,7 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.bind("version", segment.getVersion()) .bind("version", segment.getVersion())
.bind("used", true) .bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute(); .execute();
return null; return null;
@ -114,4 +114,4 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }