Merge branch 'master' into new-schema

Conflicts:
	indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
	pom.xml
	publications/whitepaper/druid.pdf
	publications/whitepaper/druid.tex
This commit is contained in:
fjy 2014-03-03 14:48:15 -08:00
commit 46b9ac78e7
59 changed files with 770 additions and 245 deletions

Binary file not shown.

Binary file not shown.

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,6 +28,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle;
@ -79,7 +80,13 @@ public class ConfigManager
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
this.insertStatement = String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
DbConnector.isPostgreSQL(dbi) ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
configTable
);
}

View File

@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -44,7 +45,11 @@ public class DbConnector
dbi,
segmentTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName
)
);
@ -56,7 +61,10 @@ public class DbConnector
dbi,
ruleTableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName
)
);
@ -68,7 +76,9 @@ public class DbConnector
dbi,
configTableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
isPostgreSQL(dbi) ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName
)
);
@ -80,16 +90,27 @@ public class DbConnector
dbi,
taskTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);":
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)
);
@ -101,13 +122,21 @@ public class DbConnector
dbi,
taskLogsTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
);
@ -119,13 +148,21 @@ public class DbConnector
dbi,
taskLocksTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
isPostgreSQL(dbi) ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
);
@ -144,16 +181,20 @@ public class DbConnector
@Override
public Void withHandle(Handle handle) throws Exception
{
if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) {
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
List<Map<String, Object>> table;
if ( isPostgreSQL(dbi) ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
}
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
@ -164,6 +205,25 @@ public class DbConnector
}
}
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return isPostgreSQL(handle);
}
}
);
}
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;

View File

@ -82,7 +82,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false",
"partitionsSpec": {
"type": "random"
"type": "hashed"
"targetPartitionSize": 5000000
},
"metadataUpdateSpec": {
@ -147,13 +147,15 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification
Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type.
Druid supports two types of partitions spec - singleDimension and random.
Druid supports two types of partitions spec - singleDimension and hashed.
In SingleDimension partition type data is partitioned based on the values in that dimension.
For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
Random partition type is more efficient and gives better distribution of data.
In hashed partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
It is recommended to use Hashed partition as it is more efficient than singleDimension since it does not need to determine the dimension for creating partitions.
Hashing also gives better distribution of data resulting in equal sized partitons and improving query performance
To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own.

View File

@ -14,6 +14,8 @@ Before we start digging into how to query Druid, make sure you've gone through t
Let's start up a simple Druid cluster so we can query all the things.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
To start a Coordinator node:
```bash

View File

@ -66,6 +66,8 @@ There are five data points spread across the day of 2013-08-31. Talk about big d
In order to ingest and query this data, we are going to need to run a historical node, a coordinator node, and an indexing service to run the batch ingestion.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
#### Starting a Local Indexing Service
The simplest indexing service we can start up is to run an [overlord](Indexing-Service.html) node in local mode. You can do so by issuing:

View File

@ -45,9 +45,9 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.61/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
[KafkaFirehoseFactory](Firehose.html) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
The following quick-start instructions for booting a Zookeeper and then Kafka cluster were taken from the [Kafka website](http://kafka.apache.org/07/quickstart.html).
1. Download Apache Kafka 0.7.2 from [http://kafka.apache.org/downloads.html](http://kafka.apache.org/downloads.html)
@ -205,11 +205,11 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node shou
Batch Ingestion
---------------
Druid is designed for large data volumes, and most real-world data sets require batch indexing be done through a Hadoop job.
Druid is designed for large data volumes, and most real-world data sets require batch indexing be done through a Hadoop job.
The setup for a single node, 'standalone' Hadoop cluster is available [here](http://hadoop.apache.org/docs/stable/single_node_setup.html).
For this tutorial, we used [Hadoop 1.0.3](https://archive.apache.org/dist/hadoop/core/hadoop-1.0.3/). There are many pages on the Internet showing how to set up a single-node (standalone) Hadoop cluster, which is all that's needed for this example.
For the purposes of this tutorial, we are going to use our very small and simple Wikipedia data set. This data can directly be ingested via other means as shown in the previous [tutorial](Tutorial%3A-Loading-Your-Data-Part-1), but we are going to use Hadoop here for demonstration purposes.
For the purposes of this tutorial, we are going to use our very small and simple Wikipedia data set. This data can directly be ingested via other means as shown in the previous [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html), but we are going to use Hadoop here for demonstration purposes.
Our data is located at:
@ -227,10 +227,12 @@ The following events should exist in the file:
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
#### Setup a Druid Cluster
#### Set Up a Druid Cluster
To index the data, we are going to need an indexing service, a historical node, and a coordinator node.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
To start the Indexing Service:
```bash
@ -305,7 +307,7 @@ Examining the contents of the file, you should find:
}
```
If you are curious about what all this configuration means, see [here](Task.html).
If you are curious about what all this configuration means, see [here](Tasks.html).
To submit the task:

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -99,7 +99,9 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if(!config.getSegmentGranularIntervals().isPresent()){
groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
@ -303,7 +305,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
try {
aggregate.addAll(logValue);
}
@ -333,6 +335,13 @@ public class DetermineHashedPartitionsJob implements Jobby
}
}
private byte[] getDataBytes(BytesWritable writable)
{
byte[] rv = new byte[writable.getLength()];
System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
return rv;
}
@Override
public void run(Context context)
throws IOException, InterruptedException

View File

@ -44,6 +44,7 @@ import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -419,25 +420,42 @@ public class DeterminePartitionsJob implements Jobby
}
public static class DeterminePartitionsDimSelectionPartitioner
extends Partitioner<BytesWritable, Text>
extends Partitioner<BytesWritable, Text> implements Configurable
{
private Configuration config;
@Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
{
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
final int index = bytes.getInt();
if (index >= numPartitions) {
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
if (config.get("mapred.job.tracker").equals("local")) {
return index % numPartitions;
} else {
if (index >= numPartitions) {
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
}
}
return index;
}
@Override
public Configuration getConf()
{
return config;
}
@Override
public void setConf(Configuration config)
{
this.config = config;
}
}
private static abstract class DeterminePartitionsDimSelectionBaseReducer

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
@JsonCreator
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -29,7 +29,8 @@ import io.druid.indexer.Jobby;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
})
public interface PartitionsSpec
{

View File

@ -27,7 +27,9 @@ import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class RandomPartitionsSpec extends AbstractPartitionsSpec
// for backward compatibility
@Deprecated
public class RandomPartitionsSpec extends HashedPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@ -38,10 +40,4 @@ public class RandomPartitionsSpec extends AbstractPartitionsSpec
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -21,6 +21,8 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@ -78,10 +80,10 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
HadoopIngestionSchema schema;
final HadoopDruidIndexerConfig cfg;
try {
schema = jsonReadWriteRead(
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
@ -91,16 +93,15 @@ public class HadoopDruidIndexerConfigTest
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopIngestionSchema.class
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withDriverConfig(schema.getDriverConfig().withVersion("some:brand:new:version"))
);
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals(
@ -120,4 +121,46 @@ public class HadoopDruidIndexerConfigTest
}
}
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,110 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
public class RestoreTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(RestoreTask.class);
public RestoreTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
TaskUtils.makeId(id, "restore", dataSource, interval),
dataSource,
interval
);
}
@Override
public String getType()
{
return "restore";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if (!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
unusedSegment.getVersion(),
myLock.getVersion()
);
}
log.info("OK to restore segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> restoredSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
restoredSegments.add(toolbox.getDataSegmentArchiver().restore(segment));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(
new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(restoredSegments)
)
);
return TaskStatus.success(getId());
}
}

View File

@ -47,6 +47,7 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -179,8 +180,11 @@ public class IndexerDBCoordinator
try {
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
DbConnector.isPostgreSQL(handle) ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
@ -196,7 +200,9 @@ public class IndexerDBCoordinator
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
} catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
@ -293,11 +299,13 @@ public class IndexerDBCoordinator
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws IOException
public List<DataSegment> withHandle(Handle handle) throws IOException, SQLException
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbTables.getSegmentsTable()
)
)

View File

@ -340,6 +340,31 @@ public class TaskSerdeTest
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testRestoreTaskSerde() throws Exception
{
final RestoreTask task = new RestoreTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final RestoreTask task2 = (RestoreTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testMoveTaskSerde() throws Exception
{

View File

@ -176,6 +176,12 @@ public class TaskLifecycleTest
{
return segment;
}
@Override
public DataSegment restore(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
},
null, // segment announcer
null, // new segment server view

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -31,6 +31,7 @@ import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
@ -66,12 +67,10 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
// Backwards Compatible
if (parseSpec == null) {
this.parseSpec = new ParseSpec(
this.parseSpec = new JSONParseSpec(
timestampSpec,
new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions)
)
{
};
);
} else {
this.parseSpec = parseSpec;
}

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
@ -34,10 +35,10 @@ import java.util.Map;
*/
public abstract class BaseQuery<T> implements Query<T>
{
public static String QUERYID = "queryId";
private final String dataSource;
private final Map<String, String> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
public BaseQuery(
@ -130,4 +131,16 @@ public abstract class BaseQuery<T> implements Query<T>
return overridden;
}
@Override
public String getId()
{
return getContextValue(QUERYID);
}
@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
}
}

View File

@ -77,4 +77,8 @@ public interface Query<T>
public Query<T> withOverriddenContext(Map<String, String> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
public Query<T> withId(String id);
public String getId();
}

View File

@ -131,7 +131,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -151,7 +151,8 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -65,11 +65,13 @@ import java.util.Set;
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{
private static final byte SEARCH_QUERY = 0x2;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>(){};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final SearchQueryConfig config;
@Inject
@ -123,7 +125,8 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
.setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override
@ -261,6 +264,11 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
);
}
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class SearchThresholdAdjustingQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private final QueryRunner<Result<SearchResultValue>> runner;
@ -269,7 +277,8 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public SearchThresholdAdjustingQueryRunner(
QueryRunner<Result<SearchResultValue>> runner,
SearchQueryConfig config
) {
)
{
this.runner = runner;
this.config = config;
}
@ -341,9 +350,4 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
);
}
}
public Ordering<Result<SearchResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -60,18 +60,15 @@ import java.util.Set;
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
};
private static final TypeReference<Result<SelectResultValue>> TYPE_REFERENCE =
new TypeReference<Result<SelectResultValue>>()
{
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;
@ -130,7 +127,8 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -119,7 +119,8 @@ public class TimeBoundaryQueryQueryToolChest
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser4(query.getType())
.setUser6("false");
.setUser6("false")
.setUser10(query.getId());
}
@Override

View File

@ -61,14 +61,15 @@ import java.util.Map;
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>(){};
new TypeReference<Object>()
{
};
private static final TypeReference<Result<TimeseriesResultValue>> TYPE_REFERENCE =
new TypeReference<Result<TimeseriesResultValue>>() {};
new TypeReference<Result<TimeseriesResultValue>>()
{
};
private final QueryConfig config;
@Inject
@ -127,7 +128,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -133,7 +133,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
.setUser9(Minutes.minutes(numMinutes).toString())
.setUser10(query.getId());
}
@Override

View File

@ -20,6 +20,7 @@
package io.druid.data.input;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
@ -57,12 +58,10 @@ public class ProtoBufInputRowParserTest
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(
new ParseSpec(
new JSONParseSpec(
new TimestampSpec("timestamp", "iso"),
new DimensionsSpec(Arrays.asList(DIMENSIONS), Arrays.<String>asList(), null)
)
{
},
),
"prototest.desc",
null, null, null, null
);

View File

@ -170,7 +170,7 @@ public class TestIndex
"\t",
Arrays.asList(COLUMNS)
),
null, null, null, null
null, null, null
);
boolean runOnce = false;
int lineCount = 0;

View File

@ -141,7 +141,7 @@
title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second},
month = {April},
year = {2011},
howpublished = "\url{http://metamarkets.com/2011/druid-part-i-real-time-analytics-at-a-billion-rows-per-second/}"
howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}"
}
@article{farber2012sap,

Binary file not shown.

View File

@ -96,9 +96,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api}. Lastly, we leave off with some
benchmarks in Section \ref{sec:benchmarks}, related work in Section
\ref{sec:related} and conclusions are Section \ref{sec:conclusions}.
query API in Section \ref{sec:query-api} and present our experimental results
in Section \ref{sec:benchmarks}. Lastly, we leave off with our learnings from
running Druid in production in Section \ref{sec:production}, related work
in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}.
\section{Problem Definition}
\label{sec:problem-definition}
@ -139,13 +140,14 @@ want queries over any arbitrary combination of dimensions to return with
sub-second latencies.
The need for Druid was facilitated by the fact that existing open source
Relational Database Management Systems and NoSQL key/value stores were unable
to provide a low latency data ingestion and query platform for interactive
applications \cite{tschetter2011druid}. In the early days of Metamarkets, we
were focused on building a hosted dashboard that would allow users to arbitrary
explore and visualize event streams. The data store powering the dashboard
needed to return queries fast enough that the data visualizations built on top
of it could provide users with an interactive experience.
Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
unable to provide a low latency data ingestion and query platform for
interactive applications \cite{tschetter2011druid}. In the early days of
Metamarkets, we were focused on building a hosted dashboard that would allow
users to arbitrary explore and visualize event streams. The data store
powering the dashboard needed to return queries fast enough that the data
visualizations built on top of it could provide users with an interactive
experience.
In addition to the query latency needs, the system had to be multi-tenant and
highly available. The Metamarkets product is used in a highly concurrent
@ -188,6 +190,7 @@ Figure~\ref{fig:cluster}.
\label{fig:cluster}
\end{figure*}
\newpage
\subsection{Real-time Nodes}
\label{sec:realtime}
Real-time nodes encapsulate the functionality to ingest and query event
@ -670,7 +673,8 @@ ability to handle complex nested filter sets is what enables Druid to drill
into data at any depth.
The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is shown below:
A sample count query over a week of data is as follows:
\newpage
\begin{verbatim}
{
"queryType" : "timeseries",
@ -688,7 +692,6 @@ A sample count query over a week of data is shown below:
} ]
}
\end{verbatim}
The query shown above will return a count of the number of rows in the Wikipedia datasource
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
@ -713,7 +716,6 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array
}
} ]
\end{verbatim}
Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and several others. Druid also supports complex aggregations
such as cardinality estimation and approximate quantile estimation. The
@ -723,9 +725,15 @@ filter and group results based on almost any arbitrary condition. It is beyond
the scope of this paper to fully describe the query API but more information
can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
We are also in the process of extending the Druid API to understand SQL.
At the time of writing, the query language does not support joins. Although the
storage format is able to support joins, we've targeted Druid at user-facing
workloads that must return in a matter of seconds, and as such, we've chosen to
not spend the time to implement joins as it has been our experience that
requiring joins on your queries often limits the performance you can achieve.
Implemting joins and extending the Druid API to understand SQL is something
we'd like to do in future work.
\section{Performance Benchmarks}
\section{Experimental Results}
\label{sec:benchmarks}
To illustrate Druid's performance, we conducted a series of experiments that
focused on measuring Druid's query and data ingestion capabilities.
@ -768,11 +776,15 @@ Please note:
1 & \texttt{SELECT count(*) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
2 & \texttt{SELECT count(*), sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
3 & \texttt{SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
5 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
6 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
\end{tabular}
\end{table*}
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_
WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER
BY cnt limit 100} \\ \hline 5 & \texttt{SELECT high\_card\_dimension, count(*)
AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?
GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 6 &
\texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1),
sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$
? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\
\hline \end{tabular} \end{table*}
Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and
Figure~\ref{fig:core_scan_rate} shows the core scan rate. In
@ -787,20 +799,12 @@ remain nearly constant. The increase in speed of a parallel computing system
is often limited by the time needed for the sequential operations of the
system, in accordance with Amdahl's law \cite{amdahl1967validity}.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling
from 25 nodes.}
\label{fig:cluster_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling from 25
nodes.} \label{fig:cluster_scan_rate} \end{figure}
\begin{figure}
\centering
\includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.}
\label{fig:core_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.} \label{fig:core_scan_rate} \end{figure}
The first query listed in Table~\ref{tab:sql_queries} is a simple
count, achieving scan rates of 33M rows/second/core. We believe
@ -878,56 +882,8 @@ than the number of dimensions.}
\label{fig:throughput_vs_num_metrics}
\end{figure}
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
between Googles Dremel \cite{melnik2010dremel} and PowerDrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting compression
algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
setup allows for faster query latencies.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druids optimization techniques such as using
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Druid in Production}
\label{sec:production}
Over the last few years of using Druid, we've gained tremendous
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
@ -1007,7 +963,56 @@ to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
\section{Conclusions and Future Work}
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
between Googles Dremel \cite{melnik2010dremel} and PowerDrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting compression
algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
setup allows for faster query latencies.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druids optimization techniques such as using
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
@ -1016,11 +1021,6 @@ ingestion and is fault-tolerant. We discussed how Druid benchmarks and
summarized key architecture aspects such
as the storage format, query language, and general execution.
In the future, we plan to extend the Druid query language to support full SQL.
Doing so will require joins, a feature we've held off on implementing because
we do our joins at the data processing layer. We are also interested in
exploring more flexible data ingestion and support for less structured data.
\balance
\section{Acknowledgements}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -29,23 +29,41 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service;
public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver
{
private final S3DataSegmentArchiverConfig config;
private final S3DataSegmentArchiverConfig archiveConfig;
private final S3DataSegmentPusherConfig restoreConfig;
@Inject
public S3DataSegmentArchiver(
RestS3Service s3Client,
S3DataSegmentArchiverConfig config
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
)
{
super(s3Client);
this.config = config;
this.archiveConfig = archiveConfig;
this.restoreConfig = restoreConfig;
}
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
String targetS3Bucket = config.getArchiveBucket();
String targetS3BaseKey = config.getArchiveBaseKey();
String targetS3Bucket = archiveConfig.getArchiveBucket();
String targetS3BaseKey = archiveConfig.getArchiveBaseKey();
return move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);
}
@Override
public DataSegment restore(DataSegment segment) throws SegmentLoadingException
{
String targetS3Bucket = restoreConfig.getBucket();
String targetS3BaseKey = restoreConfig.getBaseKey();
return move(
segment,

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -43,6 +43,12 @@ public class OmniDataSegmentArchiver implements DataSegmentArchiver
return getArchiver(segment).archive(segment);
}
@Override
public DataSegment restore(DataSegment segment) throws SegmentLoadingException
{
return getArchiver(segment).restore(segment);
}
private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -40,6 +41,7 @@ public class DbSegmentPublisher implements SegmentPublisher
private final ObjectMapper jsonMapper;
private final DbTablesConfig config;
private final IDBI dbi;
private final String statement;
@Inject
public DbSegmentPublisher(
@ -51,6 +53,20 @@ public class DbSegmentPublisher implements SegmentPublisher
this.jsonMapper = jsonMapper;
this.config = config;
this.dbi = dbi;
if (DbConnector.isPostgreSQL(dbi)) {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
}
public void publishSegment(final DataSegment segment) throws IOException
@ -82,21 +98,6 @@ public class DbSegmentPublisher implements SegmentPublisher
@Override
public Void withHandle(Handle handle) throws Exception
{
String statement;
if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
} else {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentsTable()
);
}
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())

View File

@ -240,7 +240,7 @@ public class RealtimePlumber implements Plumber
}
)
)
),
).withWaitMeasuredFromNow(),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.druid.guice.annotations.Self;
import io.druid.query.Query;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicLong;
@Singleton
public class QueryIDProvider
{
private final String host;
private final AtomicLong id = new AtomicLong();
@Inject
public QueryIDProvider(@Self DruidNode node)
{
host = node.getHost();
}
public String next(Query query)
{
return String.format(
"%s_%s_%s_%s_%s",
query.getDataSource(),
query.getDuration(),
host,
new DateTime(),
id.incrementAndGet()
);
}
}

View File

@ -57,12 +57,12 @@ public class QueryResource
{
private static final Logger log = new Logger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIDProvider idProvider;
@Inject
public QueryResource(
@ -70,7 +70,8 @@ public class QueryResource
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
QueryIDProvider idProvider
)
{
this.jsonMapper = jsonMapper;
@ -78,6 +79,7 @@ public class QueryResource
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@POST
@ -88,9 +90,9 @@ public class QueryResource
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
byte[] requestQuery = null;
String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType());
@ -103,6 +105,11 @@ public class QueryResource
try {
requestQuery = ByteStreams.toByteArray(req.getInputStream());
query = objectMapper.readValue(requestQuery, Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = idProvider.next(query);
query = query.withId(queryId);
}
requestLogger.log(
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
@ -130,6 +137,7 @@ public class QueryResource
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.setUser10(queryId)
.build("request/time", requestTime)
);
}

View File

@ -41,7 +41,7 @@ public class IntervalLoadRule extends LoadRule
@JsonCreator
public IntervalLoadRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("load") Map<String, Integer> tieredReplicants,
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
// Replicants and tier are deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
@ -49,7 +49,6 @@ public class IntervalLoadRule extends LoadRule
{
this.interval = interval;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
@ -88,4 +87,34 @@ public class IntervalLoadRule extends LoadRule
{
return interval.contains(segment.getInterval());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalLoadRule that = (IntervalLoadRule) o;
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false;
}
if (tieredReplicants != null ? !tieredReplicants.equals(that.tieredReplicants) : that.tieredReplicants != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = interval != null ? interval.hashCode() : 0;
result = 31 * result + (tieredReplicants != null ? tieredReplicants.hashCode() : 0);
return result;
}
}

View File

@ -63,7 +63,7 @@ public class FireDepartmentTest
null
)
),
null, null, null, null
null, null, null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count")

View File

@ -31,6 +31,7 @@ import io.druid.client.ServerView;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
@ -93,9 +94,7 @@ public class RealtimePlumberSchoolTest
@Override
public ParseSpec getParseSpec()
{
return new ParseSpec(new TimestampSpec("timestamp", "auto"), new DimensionsSpec(null, null, null))
{
};
return new JSONParseSpec(new TimestampSpec("timestamp", "auto"), new DimensionsSpec(null, null, null));
}
@Override

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class IntervalLoadRuleTest
{
@Test
public void testSerde() throws Exception
{
IntervalLoadRule rule = new IntervalLoadRule(
new Interval("0/3000"),
ImmutableMap.<String, Integer>of(DruidServer.DEFAULT_TIER, 2),
null,
null
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
Assert.assertEquals(rule, reread);
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.62-SNAPSHOT</version>
<version>0.6.63-SNAPSHOT</version>
</parent>
<dependencies>