Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2014-02-26 17:36:48 -08:00
commit 0721220772
16 changed files with 476 additions and 166 deletions

View File

@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle;
@ -79,7 +80,13 @@ public class ConfigManager
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
this.insertStatement = String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
DbConnector.isPostgreSQL(dbi) ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
configTable
);
}

View File

@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -44,7 +45,11 @@ public class DbConnector
dbi,
segmentTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName
)
);
@ -56,7 +61,10 @@ public class DbConnector
dbi,
ruleTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName
)
);
@ -68,7 +76,9 @@ public class DbConnector
dbi,
configTableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
isPostgreSQL(dbi) ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName
)
);
@ -80,16 +90,27 @@ public class DbConnector
dbi,
taskTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);":
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)
);
@ -101,13 +122,21 @@ public class DbConnector
dbi,
taskLogsTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
);
@ -119,13 +148,21 @@ public class DbConnector
dbi,
taskLocksTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
);
@ -144,16 +181,20 @@ public class DbConnector
@Override
public Void withHandle(Handle handle) throws Exception
{
if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) {
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
List<Map<String, Object>> table;
if ( isPostgreSQL(dbi) ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
}
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
@ -164,6 +205,25 @@ public class DbConnector
}
}
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return isPostgreSQL(handle);
}
}
);
}
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;

View File

@ -82,7 +82,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false",
"partitionsSpec": {
"type": "random"
"type": "hashed"
"targetPartitionSize": 5000000
},
"updaterJobSpec": {
@ -147,13 +147,15 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification
Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type.
Druid supports two types of partitions spec - singleDimension and random.
Druid supports two types of partitions spec - singleDimension and hashed.
In SingleDimension partition type data is partitioned based on the values in that dimension.
For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
Random partition type is more efficient and gives better distribution of data.
In hashed partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
It is recommended to use Hashed partition as it is more efficient than singleDimension since it does not need to determine the dimension for creating partitions.
Hashing also gives better distribution of data resulting in equal sized partitons and improving query performance
To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own.

View File

@ -99,7 +99,9 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if(!config.getSegmentGranularIntervals().isPresent()){
groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
@ -294,7 +296,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
try {
aggregate.addAll(logValue);
}
@ -324,6 +326,13 @@ public class DetermineHashedPartitionsJob implements Jobby
}
}
private byte[] getDataBytes(BytesWritable writable)
{
byte[] rv = new byte[writable.getLength()];
System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
return rv;
}
@Override
public void run(Context context)
throws IOException, InterruptedException

View File

@ -44,6 +44,7 @@ import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -416,25 +417,42 @@ public class DeterminePartitionsJob implements Jobby
}
public static class DeterminePartitionsDimSelectionPartitioner
extends Partitioner<BytesWritable, Text>
extends Partitioner<BytesWritable, Text> implements Configurable
{
private Configuration config;
@Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
{
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
final int index = bytes.getInt();
if (index >= numPartitions) {
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
if (config.get("mapred.job.tracker").equals("local")) {
return index % numPartitions;
} else {
if (index >= numPartitions) {
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
}
}
return index;
}
@Override
public Configuration getConf()
{
return config;
}
@Override
public void setConf(Configuration config)
{
this.config = config;
}
}
private static abstract class DeterminePartitionsDimSelectionBaseReducer

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
@JsonCreator
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -29,7 +29,8 @@ import io.druid.indexer.Jobby;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
})
public interface PartitionsSpec
{

View File

@ -27,7 +27,9 @@ import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class RandomPartitionsSpec extends AbstractPartitionsSpec
// for backward compatibility
@Deprecated
public class RandomPartitionsSpec extends HashedPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@ -38,10 +40,4 @@ public class RandomPartitionsSpec extends AbstractPartitionsSpec
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -501,6 +502,7 @@ public class HadoopDruidIndexerConfigTest
}
}
@Test
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
@ -543,4 +545,48 @@ public class HadoopDruidIndexerConfigTest
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
@Test
public void testHashedPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"hashed\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec);
}
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -179,8 +180,11 @@ public class IndexerDBCoordinator
try {
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
DbConnector.isPostgreSQL(handle) ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
@ -196,7 +200,9 @@ public class IndexerDBCoordinator
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
} catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
@ -293,11 +299,13 @@ public class IndexerDBCoordinator
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws IOException
public List<DataSegment> withHandle(Handle handle) throws IOException, SQLException
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbTables.getSegmentsTable()
)
)

View File

@ -141,7 +141,7 @@
title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second},
month = {April},
year = {2011},
howpublished = "\url{http://metamarkets.com/2011/druid-part-i-real-time-analytics-at-a-billion-rows-per-second/}"
howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}"
}
@article{farber2012sap,

Binary file not shown.

View File

@ -96,9 +96,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api}. Lastly, we leave off with some
benchmarks in Section \ref{sec:benchmarks}, related work in Section
\ref{sec:related} and conclusions are Section \ref{sec:conclusions}.
query API in Section \ref{sec:query-api} and present our experimental results
in Section \ref{sec:benchmarks}. Lastly, we leave off with our learnings from
running Druid in production in Section \ref{sec:production}, related work
in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}.
\section{Problem Definition}
\label{sec:problem-definition}
@ -139,13 +140,14 @@ want queries over any arbitrary combination of dimensions to return with
sub-second latencies.
The need for Druid was facilitated by the fact that existing open source
Relational Database Management Systems and NoSQL key/value stores were unable
to provide a low latency data ingestion and query platform for interactive
applications \cite{tschetter2011druid}. In the early days of Metamarkets, we
were focused on building a hosted dashboard that would allow users to arbitrary
explore and visualize event streams. The data store powering the dashboard
needed to return queries fast enough that the data visualizations built on top
of it could provide users with an interactive experience.
Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
unable to provide a low latency data ingestion and query platform for
interactive applications \cite{tschetter2011druid}. In the early days of
Metamarkets, we were focused on building a hosted dashboard that would allow
users to arbitrary explore and visualize event streams. The data store
powering the dashboard needed to return queries fast enough that the data
visualizations built on top of it could provide users with an interactive
experience.
In addition to the query latency needs, the system had to be multi-tenant and
highly available. The Metamarkets product is used in a highly concurrent
@ -188,6 +190,7 @@ Figure~\ref{fig:cluster}.
\label{fig:cluster}
\end{figure*}
\newpage
\subsection{Real-time Nodes}
\label{sec:realtime}
Real-time nodes encapsulate the functionality to ingest and query event
@ -670,7 +673,8 @@ ability to handle complex nested filter sets is what enables Druid to drill
into data at any depth.
The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is shown below:
A sample count query over a week of data is as follows:
\newpage
\begin{verbatim}
{
"queryType" : "timeseries",
@ -688,7 +692,6 @@ A sample count query over a week of data is shown below:
} ]
}
\end{verbatim}
The query shown above will return a count of the number of rows in the Wikipedia datasource
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
@ -713,7 +716,6 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array
}
} ]
\end{verbatim}
Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and several others. Druid also supports complex aggregations
such as cardinality estimation and approximate quantile estimation. The
@ -723,9 +725,15 @@ filter and group results based on almost any arbitrary condition. It is beyond
the scope of this paper to fully describe the query API but more information
can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
We are also in the process of extending the Druid API to understand SQL.
At the time of writing, the query language does not support joins. Although the
storage format is able to support joins, we've targeted Druid at user-facing
workloads that must return in a matter of seconds, and as such, we've chosen to
not spend the time to implement joins as it has been our experience that
requiring joins on your queries often limits the performance you can achieve.
Implemting joins and extending the Druid API to understand SQL is something
we'd like to do in future work.
\section{Performance Benchmarks}
\section{Experimental Results}
\label{sec:benchmarks}
To illustrate Druid's performance, we conducted a series of experiments that
focused on measuring Druid's query and data ingestion capabilities.
@ -768,11 +776,15 @@ Please note:
1 & \texttt{SELECT count(*) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
2 & \texttt{SELECT count(*), sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
3 & \texttt{SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
5 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
6 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
\end{tabular}
\end{table*}
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_
WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER
BY cnt limit 100} \\ \hline 5 & \texttt{SELECT high\_card\_dimension, count(*)
AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?
GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 6 &
\texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1),
sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$
? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\
\hline \end{tabular} \end{table*}
Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and
Figure~\ref{fig:core_scan_rate} shows the core scan rate. In
@ -787,20 +799,12 @@ remain nearly constant. The increase in speed of a parallel computing system
is often limited by the time needed for the sequential operations of the
system, in accordance with Amdahl's law \cite{amdahl1967validity}.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling
from 25 nodes.}
\label{fig:cluster_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling from 25
nodes.} \label{fig:cluster_scan_rate} \end{figure}
\begin{figure}
\centering
\includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.}
\label{fig:core_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.} \label{fig:core_scan_rate} \end{figure}
The first query listed in Table~\ref{tab:sql_queries} is a simple
count, achieving scan rates of 33M rows/second/core. We believe
@ -878,6 +882,87 @@ than the number of dimensions.}
\label{fig:throughput_vs_num_metrics}
\end{figure}
\section{Druid in Production}
\label{sec:production}
Over the last few years of using Druid, we've gained tremendous
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
sophisticated data analytics stack, and distributing data to handle entire data
center outages. One of the most important lessons we've learned is that no
amount of testing can accurately simulate a production environment, and failures
will occur for every imaginable and unimaginable reason. Interestingly, most of
our most severe crashes were due to misunderstanding the impacts a
seemingly small feature would have on the overall system.
Some of our more interesting observations include:
\begin{itemize}
\item Druid is most often used in production to power exploratory dashboards.
Interestingly, because many users of explatory dashboards are not from
technical backgrounds, they often issue queries without understanding the
impacts to the underlying system. For example, some users become impatient that
their queries for terabytes of data do not return in milliseconds and
continously refresh their dashboard view, generating heavy load to Druid. This
type of usage forced Druid to better defend itself against expensive repetitive
queries.
\item Cluster query performance benefits from multitenancy. Hosting every
production datasource in the same cluster leads to better data parallelization
as additional nodes are added.
\item Even if you provide users with the ability to arbitrarily explore data, they
often only have a few questions in mind. Caching is extremely important, and in
fact we see a very high percentage of our query results come from the broker cache.
\item When using a memory mapped storage engine, even a small amount of paging
data from disk can severely impact query performance. SSDs can greatly solve
this problem.
\item Leveraging approximate algorithms can greatly reduce data storage costs and
improve query performance. Many users do not care about exact answers to their
questions and are comfortable with a few percentage points of error.
\end{itemize}
\subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster.
Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
Metrics can be emitted from a production Druid cluster into a dedicated metrics
Druid cluster. Queries can be made to the metrics Druid cluster to explore
production cluster performance and stability. Leveraging a dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are
most often doing and we use this information to drive our road map.
\subsection{Pairing Druid with a Stream Processor}
At the time of writing, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}. A
Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
and Druid is used for responding to queries on top of both real-time and
historical data.
\subsection{Multiple Data Center Distribution}
Large scale production outages may not only affect single nodes, but entire
data centers as well. The tier configuration in Druid coordinator nodes allow
for segments to be replicated across multiple tiers. Hence, segments can be
exactly replicated across historical nodes in multiple data centers.
Similarily, query preference can be assigned to different tiers. It is possible
to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
@ -927,62 +1012,14 @@ of functionality as Druid, some of Druids optimization techniques such as usi
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Druid in Production}
Druid is run in production at several organizations and is often part of a more
sophisticated data analytics stack. We've made multiple design decisions to
allow for ease of usability, deployment, and monitoring.
\subsection{Operational Monitoring}
Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
Metrics can be emitted from a production Druid cluster into a dedicated metrics
Druid cluster. Queries can be made to the metrics Druid cluster to explore
production cluster performance and stability. Leveraging a dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are
most often doing and we use this information to drive what optimizations we
should implement.
\subsection{Pairing Druid with a Stream Processor}
As the time of writing, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}. A
Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
and Druid is used for responding to queries on top of both real-time and
historical data.
\subsection{Multiple Data Center Distribution}
Large scale production outages may not only affect single nodes, but entire
data centers as well. The tier configuration in Druid coordinator nodes allow
for segments to be replicated across multiple tiers. Hence, segments can be
exactly replicated across historical nodes in multiple data centers.
Similarily, query preference can be assigned to different tiers. It is possible
to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
\section{Conclusions and Future Work}
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
and is optimized for low query latencies. Druid supports streaming data
ingestion and is fault-tolerant. We discussed how Druid was able to
scan 27 billion rows in a second. We summarized key architecture aspects such
as the storage format, query language, and general execution. In the future, we
plan to cover the different algorithms weve developed for Druid and how other
systems may plug into Druid in greater detail.
ingestion and is fault-tolerant. We discussed how Druid benchmarks and
summarized key architecture aspects such
as the storage format, query language, and general execution.
\balance

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher
private final ObjectMapper jsonMapper;
private final DbTablesConfig config;
private final IDBI dbi;
private final String statement;
@Inject
public DbSegmentPublisher(
@ -51,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher
this.jsonMapper = jsonMapper;
this.config = config;
this.dbi = dbi;
if (DbConnector.isPostgreSQL(dbi)) {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
}
public void publishSegment(final DataSegment segment) throws IOException
@ -82,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher
@Override
public Void withHandle(Handle handle) throws Exception
{
String statement;
if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())

View File

@ -41,7 +41,7 @@ public class IntervalLoadRule extends LoadRule
@JsonCreator
public IntervalLoadRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("load") Map<String, Integer> tieredReplicants,
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
// Replicants and tier are deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
@ -49,7 +49,6 @@ public class IntervalLoadRule extends LoadRule
{
this.interval = interval;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
@ -88,4 +87,34 @@ public class IntervalLoadRule extends LoadRule
{
return interval.contains(segment.getInterval());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalLoadRule that = (IntervalLoadRule) o;
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false;
}
if (tieredReplicants != null ? !tieredReplicants.equals(that.tieredReplicants) : that.tieredReplicants != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = interval != null ? interval.hashCode() : 0;
result = 31 * result + (tieredReplicants != null ? tieredReplicants.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class IntervalLoadRuleTest
{
@Test
public void testSerde() throws Exception
{
IntervalLoadRule rule = new IntervalLoadRule(
new Interval("0/3000"),
ImmutableMap.<String, Integer>of(DruidServer.DEFAULT_TIER, 2),
null,
null
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
Assert.assertEquals(rule, reread);
}
}