From 1f171a2b8600b96d8349993d3916b8340d54b049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 29 Oct 2014 13:59:26 -0700 Subject: [PATCH] workaround for Derby requiring batch statements + make sure payloads are always written as bytes --- .../IndexerSQLMetadataStorageCoordinator.java | 4 +- .../io/druid/db/SQLMetadataConnector.java | 166 +++++++++--------- .../io/druid/db/SQLMetadataRuleManager.java | 4 +- .../druid/db/SQLMetadataSegmentPublisher.java | 4 +- 4 files changed, 89 insertions(+), 89 deletions(-) diff --git a/server/src/main/java/io/druid/db/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/db/IndexerSQLMetadataStorageCoordinator.java index d35ed8e3693..45ebbdf7567 100644 --- a/server/src/main/java/io/druid/db/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/db/IndexerSQLMetadataStorageCoordinator.java @@ -191,7 +191,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("version", segment.getVersion()) .bind("used", true) - .bind("payload", jsonMapper.writeValueAsString(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); @@ -277,7 +277,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) ) .bind("id", segment.getIdentifier()) - .bind("payload", jsonMapper.writeValueAsString(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) .execute(); } catch (IOException e) { diff --git a/server/src/main/java/io/druid/db/SQLMetadataConnector.java b/server/src/main/java/io/druid/db/SQLMetadataConnector.java index 27f8796b14b..bfcff599a31 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/db/SQLMetadataConnector.java @@ -20,9 +20,11 @@ package io.druid.db; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import org.apache.commons.dbcp.BasicDataSource; +import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; @@ -75,7 +77,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector protected abstract boolean tableExists(Handle handle, final String tableName); - public void createTable(final IDBI dbi, final String tableName, final String sql) + public void createTable(final IDBI dbi, final String tableName, final List sql) { try { dbi.withHandle( @@ -86,7 +88,11 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector { if (!tableExists(handle, tableName)) { log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); + final Batch batch = handle.createBatch(); + for(String s : sql) { + batch.add(s); + } + batch.execute(); } else { log.info("Table[%s] already exists", tableName); } @@ -100,44 +106,29 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector } } - public void createIndex(final IDBI dbi, final String tableName, final String indexName, final String columnName) { - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - List> table = handle.select(String.format("select * from SYS.SYSTABLES where tablename = \'%s\'", tableName.toUpperCase())); - if (table.isEmpty()) { - handle.createStatement(String.format("CREATE INDEX %1$s ON %2$s(%3$s)", indexName, tableName, columnName)).execute(); - } - return null; - } - } - ); - } - public void createSegmentTable(final IDBI dbi, final String tableName) { createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id VARCHAR(255) NOT NULL,\n" - + " dataSource VARCHAR(255) NOT NULL,\n" - + " created_date VARCHAR(255) NOT NULL,\n" - + " start VARCHAR(255) NOT NULL,\n" - + " \"end\" VARCHAR(255) NOT NULL,\n" - + " partitioned BOOLEAN NOT NULL,\n" - + " version VARCHAR(255) NOT NULL,\n" - + " used BOOLEAN NOT NULL,\n" - + " payload %2$s NOT NULL,\n" - + " PRIMARY KEY (id)\n" - + ");\n" - + "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);\n" - + "CREATE INDEX idx_%1$s_used ON %1$s(used);", - tableName, getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " start VARCHAR(255) NOT NULL,\n" + + " \"end\" VARCHAR(255) NOT NULL,\n" + + " partitioned BOOLEAN NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" + + " used BOOLEAN NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ")", + tableName, getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", tableName), + String.format("CREATE INDEX idx_%1$s_used ON %1$s(used);", tableName) ) ); } @@ -147,19 +138,20 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id VARCHAR(255) NOT NULL,\n" - + " dataSource VARCHAR(255) NOT NULL,\n" - + " version VARCHAR(255) NOT NULL,\n" - + " payload %2$s NOT NULL,\n" - + " PRIMARY KEY (id)\n" - + ");\n" - + "CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", - tableName, getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ")", + tableName, getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource);", tableName) ) ); - createIndex(dbi, tableName, "rules_dataSource", "dataSource"); } public void createConfigTable(final IDBI dbi, final String tableName) @@ -167,13 +159,15 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " name VARCHAR(255) NOT NULL,\n" - + " payload %2$s NOT NULL,\n" - + " PRIMARY KEY(name)\n" - + ");", - tableName, getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " name VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " PRIMARY KEY(name)\n" + + ")", + tableName, getPayloadType() + ) ) ); } @@ -183,18 +177,20 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id VARCHAR(255) NOT NULL,\n" - + " created_date VARCHAR(255) NOT NULL,\n" - + " datasource VARCHAR(255) NOT NULL,\n" - + " payload %2$s NOT NULL,\n" - + " status_payload %2$s NOT NULL,\n" - + " active BOOLEAN NOT NULL DEFAULT FALSE,\n" - + " PRIMARY KEY (id)\n" - + ");\n" - + "CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);", - tableName, getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " datasource VARCHAR(255) NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " status_payload %2$s NOT NULL,\n" + + " active BOOLEAN NOT NULL DEFAULT FALSE,\n" + + " PRIMARY KEY (id)\n" + + ")", + tableName, getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date);", tableName) ) ); } @@ -204,15 +200,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id %2$s NOT NULL,\n" - + " task_id VARCHAR(255) DEFAULT NULL,\n" - + " log_payload %3$s,\n" - + " PRIMARY KEY (id)\n" - + ");\n" - + "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", - tableName, getSerialType(), getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " task_id VARCHAR(255) DEFAULT NULL,\n" + + " log_payload %3$s,\n" + + " PRIMARY KEY (id)\n" + + ")", + tableName, getSerialType(), getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", tableName) ) ); } @@ -222,15 +220,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector createTable( dbi, tableName, - String.format( - "CREATE TABLE %1$s (\n" - + " id %2$s NOT NULL,\n" - + " task_id VARCHAR(255) DEFAULT NULL,\n" - + " lock_payload %3$s,\n" - + " PRIMARY KEY (id)\n" - + ");\n" - + "CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", - tableName, getSerialType(), getPayloadType() + ImmutableList.of( + String.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " task_id VARCHAR(255) DEFAULT NULL,\n" + + " lock_payload %3$s,\n" + + " PRIMARY KEY (id)\n" + + ")", + tableName, getSerialType(), getPayloadType() + ), + String.format("CREATE INDEX idx_%1$s_task_id ON %1$s(task_id);", tableName) ) ); } diff --git a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java index 3a322f700ce..8ff0749cbab 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java @@ -109,7 +109,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager .bind("id", String.format("%s_%s", defaultDatasourceName, version)) .bind("dataSource", defaultDatasourceName) .bind("version", version) - .bind("payload", jsonMapper.writeValueAsString(defaultRules)) + .bind("payload", jsonMapper.writeValueAsBytes(defaultRules)) .execute(); return null; @@ -320,7 +320,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager .bind("id", String.format("%s_%s", dataSource, version)) .bind("dataSource", dataSource) .bind("version", version) - .bind("payload", jsonMapper.writeValueAsString(newRules)) + .bind("payload", jsonMapper.writeValueAsBytes(newRules)) .execute(); return null; diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java index c5aae87348a..de34571da93 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentPublisher.java @@ -101,7 +101,7 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1) .bind("version", segment.getVersion()) .bind("used", true) - .bind("payload", jsonMapper.writeValueAsString(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) .execute(); return null; @@ -114,4 +114,4 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher throw new RuntimeException(e); } } -} \ No newline at end of file +}