From b6e198ffb60bf27e883ecfdd5ec4ded8b5c4163c Mon Sep 17 00:00:00 2001 From: sdk Date: Fri, 28 Jun 2013 09:35:48 +0200 Subject: [PATCH 1/4] Postgresql integration. Postgresql included into postgresql-schema.sql --- install/postgresql-schema.sql | 49 +++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 install/postgresql-schema.sql diff --git a/install/postgresql-schema.sql b/install/postgresql-schema.sql new file mode 100644 index 00000000000..180ec5e6702 --- /dev/null +++ b/install/postgresql-schema.sql @@ -0,0 +1,49 @@ +-- Table structure for table `config` +-- + + +DROP TABLE IF EXISTS prod_config; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_config ( + name varchar(255) NOT NULL, + payload bytea NOT NULL, + PRIMARY KEY (name) +); +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `rules` +-- +DROP TABLE IF EXISTS prod_rules; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_rules ( + id varchar(255) NOT NULL, + dataSource varchar(255) NOT NULL, + version text NOT NULL, + payload text NOT NULL, + PRIMARY KEY (id) +); +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `segments` +-- + +DROP TABLE IF EXISTS prod_segments; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_segments ( + id varchar(255) NOT NULL, + dataSource varchar(255) NOT NULL, + created_date text NOT NULL, + start text NOT NULL, + "end" text NOT NULL, + partitioned SMALLINT NOT NULL, + version text NOT NULL, + used boolean NOT NULL, + payload text NOT NULL, + PRIMARY KEY (id) +); + From e9ce6b596c2b6c278fe1f797d83879cd648addc0 Mon Sep 17 00:00:00 2001 From: Pablo Nebrera Date: Fri, 28 Jun 2013 12:49:24 +0200 Subject: [PATCH 2/4] PostgresSQL integration with druid --- .../java/com/metamx/druid/db/DbConnector.java | 15 ++++++++------- .../metamx/druid/realtime/DbSegmentPublisher.java | 2 +- .../com/metamx/druid/db/DatabaseRuleManager.java | 2 +- .../metamx/druid/db/DatabaseSegmentManager.java | 10 +++++----- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index cb202b1a8f2..a76536bed6e 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -140,15 +140,16 @@ public class DbConnector @Override public Void withHandle(Handle handle) throws Exception { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); + if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { + List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } } - return null; } } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java index 7a7e0e8ed7f..67177e8f45c 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java @@ -62,7 +62,7 @@ public class DbSegmentPublisher implements SegmentPublisher { handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", config.getSegmentTable() ) diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java index 708c61d9e24..44ea802082c 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java @@ -192,7 +192,7 @@ public class DatabaseRuleManager return handle.createQuery( // Return latest version rule by dataSource String.format( - "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", + "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", config.getRuleTable() ) ).fold( diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index 2ea9056627f..53c853d5a21 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -203,7 +203,7 @@ public class DatabaseSegmentManager for (DataSegment segment : segments) { batch.add( String.format( - "UPDATE %s SET used=1 WHERE id = '%s'", + "UPDATE %s SET used=true WHERE id = '%s'", config.getSegmentTable(), segment.getIdentifier() ) @@ -234,7 +234,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable()) + String.format("UPDATE %s SET used=true WHERE id = :id", config.getSegmentTable()) ) .bind("id", segmentId) .execute(); @@ -268,7 +268,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable()) + String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", config.getSegmentTable()) ) .bind("dataSource", ds) .execute(); @@ -298,7 +298,7 @@ public class DatabaseSegmentManager public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable()) + String.format("UPDATE %s SET used=false WHERE id = :segmentID", config.getSegmentTable()) ).bind("segmentID", segmentID) .execute(); @@ -398,7 +398,7 @@ public class DatabaseSegmentManager public List> withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable()) + String.format("SELECT payload FROM %s WHERE used=true", config.getSegmentTable()) ).list(); } } From 83b96ee507fb59d246764f2656900c197c4b035d Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 8 Jul 2013 10:40:13 -0700 Subject: [PATCH 3/4] fix concurrency problem --- .../java/com/metamx/druid/curator/announcement/Announcer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 76529a0b5ab..5f4d00011cf 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -227,10 +227,9 @@ public class Announcer boolean created = false; synchronized (toAnnounce) { if (started) { - byte[] oldBytes = subPaths.get(pathAndNode.getNode()); + byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes); if (oldBytes == null) { - subPaths.put(pathAndNode.getNode(), bytes); created = true; } else if (!Arrays.equals(oldBytes, bytes)) { throw new IAE("Cannot reannounce different values under the same path"); From adda1488dcf6c11366d788d38e02119365fc281f Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 8 Jul 2013 10:59:07 -0700 Subject: [PATCH 4/4] another thread safety fix --- .../druid/curator/announcement/Announcer.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 5f4d00011cf..708e056897c 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -64,6 +64,8 @@ public class Announcer private final ConcurrentMap listeners = new MapMaker().makeMap(); private final ConcurrentMap> announcements = new MapMaker().makeMap(); + private final Object lock = new Object(); + private boolean started = false; public Announcer( @@ -249,28 +251,30 @@ public class Announcer public void update(final String path, final byte[] bytes) { - final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + synchronized (lock) { + final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - final String parentPath = pathAndNode.getPath(); - final String nodePath = pathAndNode.getNode(); + final String parentPath = pathAndNode.getPath(); + final String nodePath = pathAndNode.getNode(); - ConcurrentMap subPaths = announcements.get(parentPath); + ConcurrentMap subPaths = announcements.get(parentPath); - if (subPaths == null || subPaths.get(nodePath) == null) { - announce(path, bytes); - return; - } - - try { - byte[] oldBytes = subPaths.get(nodePath); - - if (!Arrays.equals(oldBytes, bytes)) { - subPaths.put(nodePath, bytes); - updateAnnouncement(path, bytes); + if (subPaths == null || subPaths.get(nodePath) == null) { + announce(path, bytes); + return; + } + + try { + byte[] oldBytes = subPaths.get(nodePath); + + if (!Arrays.equals(oldBytes, bytes)) { + subPaths.put(nodePath, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw Throwables.propagate(e); } - } - catch (Exception e) { - throw Throwables.propagate(e); } }