diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java index 172c82a6e1c..308fd392fd3 100644 --- a/common/src/main/java/io/druid/common/config/ConfigManager.java +++ b/common/src/main/java/io/druid/common/config/ConfigManager.java @@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import org.joda.time.Duration; import org.skife.jdbi.v2.Handle; @@ -79,7 +80,13 @@ public class ConfigManager this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable); this.insertStatement = String.format( - "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + DbConnector.isPostgreSQL(dbi) ? + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" + + " INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;" : + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", configTable ); } diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java index 585276ade91..02b2bac2b63 100644 --- a/common/src/main/java/io/druid/db/DbConnector.java +++ b/common/src/main/java/io/druid/db/DbConnector.java @@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.sql.DataSource; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -44,7 +45,11 @@ public class DbConnector dbi, segmentTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + + "CREATE INDEX ON %1$s(dataSource);"+ + "CREATE INDEX ON %1$s(used);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", segmentTableName ) ); @@ -56,7 +61,10 @@ public class DbConnector dbi, ruleTableName, String.format( - "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ + "CREATE INDEX ON %1$s(dataSource);": + "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", ruleTableName ) ); @@ -68,7 +76,9 @@ public class DbConnector dbi, configTableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", + isPostgreSQL(dbi) ? + "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))": + "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", configTableName ) ); @@ -80,16 +90,27 @@ public class DbConnector dbi, taskTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` varchar(255) NOT NULL,\n" - + " `created_date` tinytext NOT NULL,\n" - + " `datasource` varchar(255) NOT NULL,\n" - + " `payload` longblob NOT NULL,\n" - + " `status_payload` longblob NOT NULL,\n" - + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" - + " PRIMARY KEY (`id`),\n" - + " KEY (active, created_date(100))\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id varchar(255) NOT NULL,\n" + + " created_date TEXT NOT NULL,\n" + + " datasource varchar(255) NOT NULL,\n" + + " payload bytea NOT NULL,\n" + + " status_payload bytea NOT NULL,\n" + + " active SMALLINT NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (id)\n" + + ");\n" + + "CREATE INDEX ON %1$s(active, created_date);": + "CREATE TABLE `%s` (\n" + + " `id` varchar(255) NOT NULL,\n" + + " `created_date` tinytext NOT NULL,\n" + + " `datasource` varchar(255) NOT NULL,\n" + + " `payload` longblob NOT NULL,\n" + + " `status_payload` longblob NOT NULL,\n" + + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (`id`),\n" + + " KEY (active, created_date(100))\n" + + ")", taskTableName ) ); @@ -101,13 +122,21 @@ public class DbConnector dbi, taskLogsTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `log_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " log_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `log_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLogsTableName ) ); @@ -119,13 +148,21 @@ public class DbConnector dbi, taskLocksTableName, String.format( - "CREATE TABLE `%s` (\n" - + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" - + " `task_id` varchar(255) DEFAULT NULL,\n" - + " `lock_payload` longblob,\n" - + " PRIMARY KEY (`id`),\n" - + " KEY `task_id` (`task_id`)\n" - + ")", + isPostgreSQL(dbi) ? + "CREATE TABLE %1$s (\n" + + " id bigserial NOT NULL,\n" + + " task_id varchar(255) DEFAULT NULL,\n" + + " lock_payload bytea,\n" + + " PRIMARY KEY (id)\n" + + ");\n"+ + "CREATE INDEX ON %1$s(task_id);": + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `lock_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", taskLocksTableName ) ); @@ -144,16 +181,20 @@ public class DbConnector @Override public Void withHandle(Handle handle) throws Exception { - if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); - } + List> table; + if ( isPostgreSQL(dbi) ) { + table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName)); + } else { + table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); } + + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } + return null; } } @@ -164,6 +205,25 @@ public class DbConnector } } + public static Boolean isPostgreSQL(final IDBI dbi) + { + return dbi.withHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + return isPostgreSQL(handle); + } + } + ); + } + + public static Boolean isPostgreSQL(final Handle handle) throws SQLException + { + return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); + } + private final Supplier config; private final Supplier dbTables; private final DBI dbi; diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md index 125e15c716e..6bdc21cae88 100644 --- a/docs/content/Batch-ingestion.md +++ b/docs/content/Batch-ingestion.md @@ -82,7 +82,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim "segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here", "leaveIntermediate": "false", "partitionsSpec": { - "type": "random" + "type": "hashed" "targetPartitionSize": 5000000 }, "updaterJobSpec": { @@ -147,13 +147,15 @@ The indexing process has the ability to roll data up as it processes the incomin ### Partitioning specification Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type. -Druid supports two types of partitions spec - singleDimension and random. +Druid supports two types of partitions spec - singleDimension and hashed. In SingleDimension partition type data is partitioned based on the values in that dimension. For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z. -In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row. -Random partition type is more efficient and gives better distribution of data. +In hashed partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row. + +It is recommended to use Hashed partition as it is more efficient than singleDimension since it does not need to determine the dimension for creating partitions. +Hashing also gives better distribution of data resulting in equal sized partitons and improving query performance To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 29950fae345..407cf84dec3 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -99,7 +99,9 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); + if(!config.getSegmentGranularIntervals().isPresent()){ groupByJob.setNumReduceTasks(1); + } JobHelper.setupClasspath(config, groupByJob); config.addInputPaths(groupByJob); @@ -294,7 +296,7 @@ public class DetermineHashedPartitionsJob implements Jobby { HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE); for (BytesWritable value : values) { - HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes()); + HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value)); try { aggregate.addAll(logValue); } @@ -324,6 +326,13 @@ public class DetermineHashedPartitionsJob implements Jobby } } + private byte[] getDataBytes(BytesWritable writable) + { + byte[] rv = new byte[writable.getLength()]; + System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength()); + return rv; + } + @Override public void run(Context context) throws IOException, InterruptedException diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index e2284988dd9..890a3516189 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -44,6 +44,7 @@ import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -416,25 +417,42 @@ public class DeterminePartitionsJob implements Jobby } public static class DeterminePartitionsDimSelectionPartitioner - extends Partitioner + extends Partitioner implements Configurable { + private Configuration config; + @Override public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) { final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes final int index = bytes.getInt(); - - if (index >= numPartitions) { - throw new ISE( - "Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!", - index, - numPartitions - ); + if (config.get("mapred.job.tracker").equals("local")) { + return index % numPartitions; + } else { + if (index >= numPartitions) { + throw new ISE( + "Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!", + index, + numPartitions + ); + } } return index; } + + @Override + public Configuration getConf() + { + return config; + } + + @Override + public void setConf(Configuration config) + { + this.config = config; + } } private static abstract class DeterminePartitionsDimSelectionBaseReducer diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java new file mode 100644 index 00000000000..d164cef1638 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexer.partitions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.indexer.DetermineHashedPartitionsJob; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.Jobby; + +import javax.annotation.Nullable; + +public class HashedPartitionsSpec extends AbstractPartitionsSpec +{ + @JsonCreator + public HashedPartitionsSpec( + @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, + @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, + @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped + ) + { + super(targetPartitionSize, maxPartitionSize, assumeGrouped); + } + + @Override + public Jobby getPartitionJob(HadoopDruidIndexerConfig config) + { + return new DetermineHashedPartitionsJob(config); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index 2186c584879..cce5de8becf 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -29,7 +29,8 @@ import io.druid.indexer.Jobby; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), - @JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class) + @JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class), + @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class) }) public interface PartitionsSpec { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java index 7b13a1bf663..30f13f49478 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java @@ -27,7 +27,9 @@ import io.druid.indexer.Jobby; import javax.annotation.Nullable; -public class RandomPartitionsSpec extends AbstractPartitionsSpec +// for backward compatibility +@Deprecated +public class RandomPartitionsSpec extends HashedPartitionsSpec { @JsonCreator public RandomPartitionsSpec( @@ -38,10 +40,4 @@ public class RandomPartitionsSpec extends AbstractPartitionsSpec { super(targetPartitionSize, maxPartitionSize, assumeGrouped); } - - @Override - public Jobby getPartitionJob(HadoopDruidIndexerConfig config) - { - return new DetermineHashedPartitionsJob(config); - } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index f6c8a1f66d0..c6bb0ba719f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.db.DbConnectorConfig; import io.druid.indexer.granularity.UniformGranularitySpec; +import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.RandomPartitionsSpec; import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; @@ -501,6 +502,7 @@ public class HadoopDruidIndexerConfigTest } } + @Test public void testRandomPartitionsSpec() throws Exception{ { final HadoopDruidIndexerConfig cfg; @@ -543,4 +545,48 @@ public class HadoopDruidIndexerConfigTest Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec); } } + + @Test + public void testHashedPartitionsSpec() throws Exception{ + { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"partitionsSpec\":{" + + " \"targetPartitionSize\":100," + + " \"type\":\"hashed\"" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + true + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + 100 + ); + + Assert.assertEquals( + "getMaxPartitionSize", + partitionsSpec.getMaxPartitionSize(), + 150 + ); + + Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec); + } + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 7e5f3ef48dd..dc02ee9d4ef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,6 +28,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -179,8 +180,11 @@ public class IndexerDBCoordinator try { handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + DbConnector.isPostgreSQL(handle) ? + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)": + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", dbTables.getSegmentsTable() ) ) @@ -196,7 +200,9 @@ public class IndexerDBCoordinator .execute(); log.info("Published segment [%s] to DB", segment.getIdentifier()); - } catch (Exception e) { + } catch(SQLException e) { + throw new IOException(e); + } catch(Exception e) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); } else { @@ -293,11 +299,13 @@ public class IndexerDBCoordinator new HandleCallback>() { @Override - public List withHandle(Handle handle) throws IOException + public List withHandle(Handle handle) throws IOException, SQLException { return handle.createQuery( String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + DbConnector.isPostgreSQL(handle)? + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0": + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", dbTables.getSegmentsTable() ) ) diff --git a/publications/whitepaper/druid.bib b/publications/whitepaper/druid.bib index 22a9f24d427..07b824f3f94 100644 --- a/publications/whitepaper/druid.bib +++ b/publications/whitepaper/druid.bib @@ -141,7 +141,7 @@ title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second}, month = {April}, year = {2011}, - howpublished = "\url{http://metamarkets.com/2011/druid-part-i-real-time-analytics-at-a-billion-rows-per-second/}" + howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}" } @article{farber2012sap, diff --git a/publications/whitepaper/druid.pdf b/publications/whitepaper/druid.pdf index b8f30273680..7cfc29d8390 100644 Binary files a/publications/whitepaper/druid.pdf and b/publications/whitepaper/druid.pdf differ diff --git a/publications/whitepaper/druid.tex b/publications/whitepaper/druid.tex index 9338f51bfc6..8f417cfbfcc 100644 --- a/publications/whitepaper/druid.tex +++ b/publications/whitepaper/druid.tex @@ -96,9 +96,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from the point of view of how data flows through the system in Section \ref{sec:architecture}. We then discuss how and why data gets converted into a binary format in Section \ref{sec:storage-format}. We briefly describe the -query API in Section \ref{sec:query-api}. Lastly, we leave off with some -benchmarks in Section \ref{sec:benchmarks}, related work in Section -\ref{sec:related} and conclusions are Section \ref{sec:conclusions}. +query API in Section \ref{sec:query-api} and present our experimental results +in Section \ref{sec:benchmarks}. Lastly, we leave off with our learnings from +running Druid in production in Section \ref{sec:production}, related work +in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}. \section{Problem Definition} \label{sec:problem-definition} @@ -139,13 +140,14 @@ want queries over any arbitrary combination of dimensions to return with sub-second latencies. The need for Druid was facilitated by the fact that existing open source -Relational Database Management Systems and NoSQL key/value stores were unable -to provide a low latency data ingestion and query platform for interactive -applications \cite{tschetter2011druid}. In the early days of Metamarkets, we -were focused on building a hosted dashboard that would allow users to arbitrary -explore and visualize event streams. The data store powering the dashboard -needed to return queries fast enough that the data visualizations built on top -of it could provide users with an interactive experience. +Relational Database Management Systems (RDBMS) and NoSQL key/value stores were +unable to provide a low latency data ingestion and query platform for +interactive applications \cite{tschetter2011druid}. In the early days of +Metamarkets, we were focused on building a hosted dashboard that would allow +users to arbitrary explore and visualize event streams. The data store +powering the dashboard needed to return queries fast enough that the data +visualizations built on top of it could provide users with an interactive +experience. In addition to the query latency needs, the system had to be multi-tenant and highly available. The Metamarkets product is used in a highly concurrent @@ -188,6 +190,7 @@ Figure~\ref{fig:cluster}. \label{fig:cluster} \end{figure*} +\newpage \subsection{Real-time Nodes} \label{sec:realtime} Real-time nodes encapsulate the functionality to ingest and query event @@ -670,7 +673,8 @@ ability to handle complex nested filter sets is what enables Druid to drill into data at any depth. The exact query syntax depends on the query type and the information requested. -A sample count query over a week of data is shown below: +A sample count query over a week of data is as follows: +\newpage \begin{verbatim} { "queryType" : "timeseries", @@ -688,7 +692,6 @@ A sample count query over a week of data is shown below: } ] } \end{verbatim} - The query shown above will return a count of the number of rows in the Wikipedia datasource from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: @@ -713,7 +716,6 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array } } ] \end{verbatim} - Druid supports many types of aggregations including double sums, long sums, minimums, maximums, and several others. Druid also supports complex aggregations such as cardinality estimation and approximate quantile estimation. The @@ -723,9 +725,15 @@ filter and group results based on almost any arbitrary condition. It is beyond the scope of this paper to fully describe the query API but more information can be found online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}. -We are also in the process of extending the Druid API to understand SQL. +At the time of writing, the query language does not support joins. Although the +storage format is able to support joins, we've targeted Druid at user-facing +workloads that must return in a matter of seconds, and as such, we've chosen to +not spend the time to implement joins as it has been our experience that +requiring joins on your queries often limits the performance you can achieve. +Implemting joins and extending the Druid API to understand SQL is something +we'd like to do in future work. -\section{Performance Benchmarks} +\section{Experimental Results} \label{sec:benchmarks} To illustrate Druid's performance, we conducted a series of experiments that focused on measuring Druid's query and data ingestion capabilities. @@ -768,11 +776,15 @@ Please note: 1 & \texttt{SELECT count(*) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline 2 & \texttt{SELECT count(*), sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline 3 & \texttt{SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline - 4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline - 5 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline - 6 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline - \end{tabular} -\end{table*} + 4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ +WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER +BY cnt limit 100} \\ \hline 5 & \texttt{SELECT high\_card\_dimension, count(*) +AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? +GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 6 & +\texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), +sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ +? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ +\hline \end{tabular} \end{table*} Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and Figure~\ref{fig:core_scan_rate} shows the core scan rate. In @@ -787,20 +799,12 @@ remain nearly constant. The increase in speed of a parallel computing system is often limited by the time needed for the sequential operations of the system, in accordance with Amdahl's law \cite{amdahl1967validity}. -\begin{figure} -\centering -\includegraphics[width = 2.8in]{cluster_scan_rate} -\caption{Druid cluster scan rate with lines indicating linear scaling - from 25 nodes.} -\label{fig:cluster_scan_rate} -\end{figure} +\begin{figure} \centering \includegraphics[width = 2.8in]{cluster_scan_rate} +\caption{Druid cluster scan rate with lines indicating linear scaling from 25 +nodes.} \label{fig:cluster_scan_rate} \end{figure} -\begin{figure} -\centering -\includegraphics[width = 2.8in]{core_scan_rate} -\caption{Druid core scan rate.} -\label{fig:core_scan_rate} -\end{figure} +\begin{figure} \centering \includegraphics[width = 2.8in]{core_scan_rate} +\caption{Druid core scan rate.} \label{fig:core_scan_rate} \end{figure} The first query listed in Table~\ref{tab:sql_queries} is a simple count, achieving scan rates of 33M rows/second/core. We believe @@ -878,6 +882,87 @@ than the number of dimensions.} \label{fig:throughput_vs_num_metrics} \end{figure} +\section{Druid in Production} +\label{sec:production} +Over the last few years of using Druid, we've gained tremendous +knowledge about handling production workloads, setting up correct operational +monitoring, integrating Druid with other products as part of a more +sophisticated data analytics stack, and distributing data to handle entire data +center outages. One of the most important lessons we've learned is that no +amount of testing can accurately simulate a production environment, and failures +will occur for every imaginable and unimaginable reason. Interestingly, most of +our most severe crashes were due to misunderstanding the impacts a +seemingly small feature would have on the overall system. + +Some of our more interesting observations include: +\begin{itemize} +\item Druid is most often used in production to power exploratory dashboards. +Interestingly, because many users of explatory dashboards are not from +technical backgrounds, they often issue queries without understanding the +impacts to the underlying system. For example, some users become impatient that +their queries for terabytes of data do not return in milliseconds and +continously refresh their dashboard view, generating heavy load to Druid. This +type of usage forced Druid to better defend itself against expensive repetitive +queries. + +\item Cluster query performance benefits from multitenancy. Hosting every +production datasource in the same cluster leads to better data parallelization +as additional nodes are added. + +\item Even if you provide users with the ability to arbitrarily explore data, they +often only have a few questions in mind. Caching is extremely important, and in +fact we see a very high percentage of our query results come from the broker cache. + +\item When using a memory mapped storage engine, even a small amount of paging +data from disk can severely impact query performance. SSDs can greatly solve +this problem. + +\item Leveraging approximate algorithms can greatly reduce data storage costs and +improve query performance. Many users do not care about exact answers to their +questions and are comfortable with a few percentage points of error. +\end{itemize} + +\subsection{Operational Monitoring} +Proper monitoring is critical to run a large scale distributed cluster. +Each Druid node is designed to periodically emit a set of operational metrics. +These metrics may include system level data such as CPU usage, available +memory, and disk capacity, JVM statistics such as garbage collection time, and +heap usage, or node specific metrics such as segment scan time, cache +hit rates, and data ingestion latencies. For each query, Druid nodes can also +emit metrics about the details of the query such as the number of filters +applied, or the interval of data requested. + +Metrics can be emitted from a production Druid cluster into a dedicated metrics +Druid cluster. Queries can be made to the metrics Druid cluster to explore +production cluster performance and stability. Leveraging a dedicated metrics +cluster has allowed us to find numerous production problems, such as gradual +query speed degregations, less than optimally tuned hardware, and various other +system bottlenecks. We also use a metrics cluster to analyze what queries are +made in production. This analysis allows us to determine what our users are +most often doing and we use this information to drive our road map. + +\subsection{Pairing Druid with a Stream Processor} +At the time of writing, Druid can only understand fully denormalized data +streams. In order to provide full business logic in production, Druid can be +paired with a stream processor such as Apache Storm \cite{marz2013storm}. A +Storm topology consumes events from a data stream, retains only those that are +“on-time”, and applies any relevant business logic. This could range from +simple transformations, such as id to name lookups, up to complex operations +such as multi-stream joins. The Storm topology forwards the processed event +stream to Druid in real-time. Storm handles the streaming data processing work, +and Druid is used for responding to queries on top of both real-time and +historical data. + +\subsection{Multiple Data Center Distribution} +Large scale production outages may not only affect single nodes, but entire +data centers as well. The tier configuration in Druid coordinator nodes allow +for segments to be replicated across multiple tiers. Hence, segments can be +exactly replicated across historical nodes in multiple data centers. +Similarily, query preference can be assigned to different tiers. It is possible +to have nodes in one data center act as a primary cluster (and recieve all +queries) and have a redundant cluster in another data center. Such a setup may +be desired if one data center is situated much closer to users. + \section{Related Work} \label{sec:related} Cattell \cite{cattell2011scalable} maintains a great summary about existing @@ -927,62 +1012,14 @@ of functionality as Druid, some of Druid’s optimization techniques such as usi inverted indices to perform fast filters are also used in other data stores \cite{macnicol2004sybase}. -\section{Druid in Production} -Druid is run in production at several organizations and is often part of a more -sophisticated data analytics stack. We've made multiple design decisions to -allow for ease of usability, deployment, and monitoring. - -\subsection{Operational Monitoring} -Each Druid node is designed to periodically emit a set of operational metrics. -These metrics may include system level data such as CPU usage, available -memory, and disk capacity, JVM statistics such as garbage collection time, and -heap usage, or node specific metrics such as segment scan time, cache -hit rates, and data ingestion latencies. For each query, Druid nodes can also -emit metrics about the details of the query such as the number of filters -applied, or the interval of data requested. - -Metrics can be emitted from a production Druid cluster into a dedicated metrics -Druid cluster. Queries can be made to the metrics Druid cluster to explore -production cluster performance and stability. Leveraging a dedicated metrics -cluster has allowed us to find numerous production problems, such as gradual -query speed degregations, less than optimally tuned hardware, and various other -system bottlenecks. We also use a metrics cluster to analyze what queries are -made in production. This analysis allows us to determine what our users are -most often doing and we use this information to drive what optimizations we -should implement. - -\subsection{Pairing Druid with a Stream Processor} -As the time of writing, Druid can only understand fully denormalized data -streams. In order to provide full business logic in production, Druid can be -paired with a stream processor such as Apache Storm \cite{marz2013storm}. A -Storm topology consumes events from a data stream, retains only those that are -“on-time”, and applies any relevant business logic. This could range from -simple transformations, such as id to name lookups, up to complex operations -such as multi-stream joins. The Storm topology forwards the processed event -stream to Druid in real-time. Storm handles the streaming data processing work, -and Druid is used for responding to queries on top of both real-time and -historical data. - -\subsection{Multiple Data Center Distribution} -Large scale production outages may not only affect single nodes, but entire -data centers as well. The tier configuration in Druid coordinator nodes allow -for segments to be replicated across multiple tiers. Hence, segments can be -exactly replicated across historical nodes in multiple data centers. -Similarily, query preference can be assigned to different tiers. It is possible -to have nodes in one data center act as a primary cluster (and recieve all -queries) and have a redundant cluster in another data center. Such a setup may -be desired if one data center is situated much closer to users. - -\section{Conclusions and Future Work} +\section{Conclusions} \label{sec:conclusions} In this paper, we presented Druid, a distributed, column-oriented, real-time analytical data store. Druid is designed to power high performance applications and is optimized for low query latencies. Druid supports streaming data -ingestion and is fault-tolerant. We discussed how Druid was able to -scan 27 billion rows in a second. We summarized key architecture aspects such -as the storage format, query language, and general execution. In the future, we -plan to cover the different algorithms we’ve developed for Druid and how other -systems may plug into Druid in greater detail. +ingestion and is fault-tolerant. We discussed how Druid benchmarks and +summarized key architecture aspects such +as the storage format, query language, and general execution. \balance diff --git a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java index 8ef9dfd193d..b97738aa91a 100644 --- a/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java +++ b/server/src/main/java/io/druid/segment/realtime/DbSegmentPublisher.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.common.logger.Logger; +import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher private final ObjectMapper jsonMapper; private final DbTablesConfig config; private final IDBI dbi; + private final String statement; @Inject public DbSegmentPublisher( @@ -51,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher this.jsonMapper = jsonMapper; this.config = config; this.dbi = dbi; + + if (DbConnector.isPostgreSQL(dbi)) { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } else { + this.statement = String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + config.getSegmentsTable() + ); + } } public void publishSegment(final DataSegment segment) throws IOException @@ -82,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher @Override public Void withHandle(Handle handle) throws Exception { - String statement; - if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } else { - statement = String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - config.getSegmentsTable() - ); - } - handle.createStatement(statement) .bind("id", segment.getIdentifier()) .bind("dataSource", segment.getDataSource()) diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java index 6a1599dc4ad..9b2f67c652d 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java @@ -41,7 +41,7 @@ public class IntervalLoadRule extends LoadRule @JsonCreator public IntervalLoadRule( @JsonProperty("interval") Interval interval, - @JsonProperty("load") Map tieredReplicants, + @JsonProperty("tieredReplicants") Map tieredReplicants, // Replicants and tier are deprecated @JsonProperty("replicants") Integer replicants, @JsonProperty("tier") String tier @@ -49,7 +49,6 @@ public class IntervalLoadRule extends LoadRule { this.interval = interval; - if (tieredReplicants != null) { this.tieredReplicants = tieredReplicants; } else { // Backwards compatible @@ -88,4 +87,34 @@ public class IntervalLoadRule extends LoadRule { return interval.contains(segment.getInterval()); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IntervalLoadRule that = (IntervalLoadRule) o; + + if (interval != null ? !interval.equals(that.interval) : that.interval != null) { + return false; + } + if (tieredReplicants != null ? !tieredReplicants.equals(that.tieredReplicants) : that.tieredReplicants != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = interval != null ? interval.hashCode() : 0; + result = 31 * result + (tieredReplicants != null ? tieredReplicants.hashCode() : 0); + return result; + } } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/IntervalLoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/IntervalLoadRuleTest.java new file mode 100644 index 00000000000..becdd0994e1 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/rules/IntervalLoadRuleTest.java @@ -0,0 +1,49 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + + package io.druid.server.coordinator.rules; + + import com.fasterxml.jackson.databind.ObjectMapper; + import com.google.common.collect.ImmutableMap; + import io.druid.client.DruidServer; + import io.druid.jackson.DefaultObjectMapper; + import junit.framework.Assert; + import org.joda.time.Interval; + import org.junit.Test; + + /** + */ + public class IntervalLoadRuleTest + { + @Test + public void testSerde() throws Exception + { + IntervalLoadRule rule = new IntervalLoadRule( + new Interval("0/3000"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, 2), + null, + null + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class); + + Assert.assertEquals(rule, reread); + } + } \ No newline at end of file