Merge branch 'master' into hadoop-version-update
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -215,7 +215,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|
|||
"type" : "static",
|
||||
"paths" : "data.json"
|
||||
},
|
||||
"targetPartitionSi:qze" : 5000000,
|
||||
"targetPartitionSize" : 5000000,
|
||||
"rollupSpec" : {
|
||||
"aggs": [{
|
||||
"type" : "count",
|
||||
|
|
|
@ -36,8 +36,6 @@ druid.processing.numThreads=1
|
|||
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
|
||||
```
|
||||
|
||||
Note: This will spin up a Historical node with the local filesystem as deep storage.
|
||||
|
||||
Production Configs
|
||||
------------------
|
||||
These production configs are using S3 as a deep store.
|
||||
|
|
|
@ -50,11 +50,11 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|
|||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|type|The task type, this should always be "index".|yes|
|
||||
|id|The task ID.|no|
|
||||
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|
||||
|granularitySpec|Specifies the segment chunks that the task will process. `type` is always "uniform"; `gran` sets the granularity of the chunks ("DAY" means all segments containing timestamps in the same day, while `intervals` sets the interval that the chunks will cover.|yes|
|
||||
|spatialDimensions|Dimensions to build spatial indexes over. See [Geographic Queries](GeographicQueries.html).|no|
|
||||
|aggregators|The metrics to aggregate in the data set. For more info, see [Aggregations](Aggregations.html)|yes|
|
||||
|indexGranularity|The rollup granularity for timestamps.|no|
|
||||
|indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no|
|
||||
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no|
|
||||
|firehose|The input source of data. For more info, see [Firehose](Firehose.html)|yes|
|
||||
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no|
|
||||
|
@ -163,7 +163,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
|
|||
|availabilityGroup|String|An uniqueness identifier for the task. Tasks with the same availability group will always run on different middle managers. Used mainly for replication. |yes|
|
||||
|requiredCapacity|Integer|How much middle manager capacity this task will take.|yes|
|
||||
|
||||
For schema, fireDepartmentConfig, windowPeriod, segmentGranularity, and rejectionPolicy, see the [realtime-ingestion doc](Realtime-ingestion.html). For firehose configuration, see [Firehose](Firehose.html).
|
||||
For schema, fireDepartmentConfig, windowPeriod, segmentGranularity, and rejectionPolicy, see [Realtime Ingestion](Realtime-ingestion.html). For firehose configuration, see [Firehose](Firehose.html).
|
||||
|
||||
|
||||
Segment Merging Tasks
|
||||
|
@ -175,6 +175,7 @@ Append tasks append a list of segments together into a single segment (one after
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "append",
|
||||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"segments": <JSON list of DataSegment objects to append>
|
||||
|
@ -187,6 +188,7 @@ Merge tasks merge a list of segments together. Any common timestamps are merged.
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "merge",
|
||||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"segments": <JSON list of DataSegment objects to append>
|
||||
|
@ -202,6 +204,7 @@ Delete tasks create empty segments with no data. The grammar is:
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "delete",
|
||||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"segments": <JSON list of DataSegment objects to append>
|
||||
|
@ -214,6 +217,7 @@ Kill tasks delete all information about a segment and removes it from deep stora
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "kill",
|
||||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"segments": <JSON list of DataSegment objects to append>
|
||||
|
@ -229,6 +233,7 @@ These tasks convert segments from an existing older index version to the latest
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "version_converter",
|
||||
"id": <task_id>,
|
||||
"groupId" : <task_group_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
|
@ -243,6 +248,7 @@ These tasks start, sleep for a time and are used only for testing. The available
|
|||
|
||||
```json
|
||||
{
|
||||
"type": "noop",
|
||||
"id": <optional_task_id>,
|
||||
"interval" : <optional_segment_interval>,
|
||||
"runTime" : <optional_millis_to_sleep>,
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonIgnore
|
||||
private final HadoopDruidIndexerSchema schema;
|
||||
@JsonIgnore
|
||||
private final String hadoopCoordinates;
|
||||
private final List<String> hadoopDependencyCoordinates;
|
||||
|
||||
/**
|
||||
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
|
@ -86,7 +86,8 @@ public class HadoopIndexTask extends AbstractTask
|
|||
public HadoopIndexTask(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("config") HadoopDruidIndexerSchema schema,
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
|
||||
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -100,7 +101,9 @@ public class HadoopIndexTask extends AbstractTask
|
|||
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
|
||||
|
||||
this.schema = schema;
|
||||
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
|
||||
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
|
||||
hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates
|
||||
) : hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,20 +135,16 @@ public class HadoopIndexTask extends AbstractTask
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getHadoopCoordinates()
|
||||
public List<String> getHadoopDependencyCoordinates()
|
||||
{
|
||||
return hadoopCoordinates;
|
||||
return hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
// setup Hadoop
|
||||
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopCoordinates
|
||||
);
|
||||
|
||||
final List<URL> extensionURLs = Lists.newArrayList();
|
||||
for (String coordinate : extensionsConfig.getCoordinates()) {
|
||||
|
@ -161,7 +160,12 @@ public class HadoopIndexTask extends AbstractTask
|
|||
final List<URL> driverURLs = Lists.newArrayList();
|
||||
driverURLs.addAll(nonHadoopURLs);
|
||||
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
|
||||
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopDependencyCoordinate
|
||||
);
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
}
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
|
|
|
@ -401,6 +401,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
all : druid_demo.pdf
|
||||
|
||||
clean :
|
||||
@rm -f *.aux *.bbl *.blg *.log
|
||||
|
||||
%.tex : %.bib
|
||||
|
||||
%.pdf : %.tex %.bib
|
||||
lualatex $(*F)
|
||||
bibtex $(*F)
|
||||
lualatex $(*F)
|
||||
lualatex $(*F)
|
|
@ -0,0 +1,54 @@
|
|||
\relax
|
||||
\providecommand\HyperFirstAtBeginDocument{\AtBeginDocument}
|
||||
\HyperFirstAtBeginDocument{\ifx\hyper@anchor\@undefined
|
||||
\global\let\oldcontentsline\contentsline
|
||||
\gdef\contentsline#1#2#3#4{\oldcontentsline{#1}{#2}{#3}}
|
||||
\global\let\oldnewlabel\newlabel
|
||||
\gdef\newlabel#1#2{\newlabelxx{#1}#2}
|
||||
\gdef\newlabelxx#1#2#3#4#5#6{\oldnewlabel{#1}{{#2}{#3}}}
|
||||
\AtEndDocument{\ifx\hyper@anchor\@undefined
|
||||
\let\contentsline\oldcontentsline
|
||||
\let\newlabel\oldnewlabel
|
||||
\fi}
|
||||
\fi}
|
||||
\global\let\hyper@last\relax
|
||||
\gdef\HyperFirstAtBeginDocument#1{#1}
|
||||
\providecommand\HyField@AuxAddToFields[1]{}
|
||||
\citation{hunt2010zookeeper}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {1}Introduction}{1}{section.1}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {1.1}The Need for Druid}{1}{subsection.1.1}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {2}Architecture}{1}{section.2}}
|
||||
\citation{abadi2008column}
|
||||
\@writefile{lof}{\contentsline {figure}{\numberline {1}{\ignorespaces An overview of a Druid cluster and the flow of data through the cluster.}}{2}{figure.1}}
|
||||
\newlabel{fig:cluster}{{1}{2}{An overview of a Druid cluster and the flow of data through the cluster}{figure.1}{}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.1}Real-time Nodes}{2}{subsection.2.1}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.2}Historical Nodes}{2}{subsection.2.2}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.3}Broker Nodes}{2}{subsection.2.3}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.4}Coordinator Nodes}{2}{subsection.2.4}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.5}Query Processing}{2}{subsection.2.5}}
|
||||
\citation{tomasic1993performance}
|
||||
\citation{colantonio2010concise}
|
||||
\@writefile{lot}{\contentsline {table}{\numberline {1}{\ignorespaces Sample sales data set.}}{3}{table.1}}
|
||||
\newlabel{tab:sample_data}{{1}{3}{Sample sales data set}{table.1}{}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {2.6}Query Capabilities}{3}{subsection.2.6}}
|
||||
\@writefile{lof}{\contentsline {figure}{\numberline {2}{\ignorespaces Query latencies of production data sources.}}{3}{figure.2}}
|
||||
\newlabel{fig:query_latency}{{2}{3}{Query latencies of production data sources}{figure.2}{}}
|
||||
\@writefile{lof}{\contentsline {figure}{\numberline {3}{\ignorespaces Druid \& MySQL benchmarks -- 100GB TPC-H data.}}{3}{figure.3}}
|
||||
\newlabel{fig:tpch_100gb}{{3}{3}{Druid \& MySQL benchmarks -- 100GB TPC-H data}{figure.3}{}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {3}Performance}{3}{section.3}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {3.1}Query Performance}{3}{subsection.3.1}}
|
||||
\bibstyle{abbrv}
|
||||
\bibdata{druid_demo}
|
||||
\bibcite{abadi2008column}{1}
|
||||
\bibcite{colantonio2010concise}{2}
|
||||
\bibcite{hunt2010zookeeper}{3}
|
||||
\bibcite{tomasic1993performance}{4}
|
||||
\@writefile{lof}{\contentsline {figure}{\numberline {4}{\ignorespaces Combined cluster ingestion rates.}}{4}{figure.4}}
|
||||
\newlabel{fig:ingestion_rate}{{4}{4}{Combined cluster ingestion rates}{figure.4}{}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {3.2}Data Ingestion Performance}{4}{subsection.3.2}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {4}Demonstration Details}{4}{section.4}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {4.1}Setup}{4}{subsection.4.1}}
|
||||
\@writefile{toc}{\contentsline {subsection}{\numberline {4.2}Goals}{4}{subsection.4.2}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {5}Acknowledgments}{4}{section.5}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {6}Additional Authors}{4}{section.6}}
|
||||
\@writefile{toc}{\contentsline {section}{\numberline {7}References}{4}{section.7}}
|
|
@ -0,0 +1,27 @@
|
|||
\begin{thebibliography}{1}
|
||||
|
||||
\bibitem{abadi2008column}
|
||||
D.~J. Abadi, S.~R. Madden, and N.~Hachem.
|
||||
\newblock Column-stores vs. row-stores: How different are they really?
|
||||
\newblock In {\em Proceedings of the 2008 ACM SIGMOD international conference
|
||||
on Management of data}, pages 967--980. ACM, 2008.
|
||||
|
||||
\bibitem{colantonio2010concise}
|
||||
A.~Colantonio and R.~Di~Pietro.
|
||||
\newblock Concise: Compressed ‘n’composable integer set.
|
||||
\newblock {\em Information Processing Letters}, 110(16):644--650, 2010.
|
||||
|
||||
\bibitem{hunt2010zookeeper}
|
||||
P.~Hunt, M.~Konar, F.~P. Junqueira, and B.~Reed.
|
||||
\newblock Zookeeper: Wait-free coordination for internet-scale systems.
|
||||
\newblock In {\em USENIX ATC}, volume~10, 2010.
|
||||
|
||||
\bibitem{tomasic1993performance}
|
||||
A.~Tomasic and H.~Garcia-Molina.
|
||||
\newblock Performance of inverted indices in shared-nothing distributed text
|
||||
document information retrieval systems.
|
||||
\newblock In {\em Parallel and Distributed Information Systems, 1993.,
|
||||
Proceedings of the Second International Conference on}, pages 8--17. IEEE,
|
||||
1993.
|
||||
|
||||
\end{thebibliography}
|
|
@ -0,0 +1,420 @@
|
|||
@article{cattell2011scalable,
|
||||
title={Scalable SQL and NoSQL data stores},
|
||||
author={Cattell, Rick},
|
||||
journal={ACM SIGMOD Record},
|
||||
volume={39},
|
||||
number={4},
|
||||
pages={12--27},
|
||||
year={2011},
|
||||
publisher={ACM}
|
||||
}
|
||||
|
||||
@article{chang2008bigtable,
|
||||
title={Bigtable: A distributed storage system for structured data},
|
||||
author={Chang, Fay and Dean, Jeffrey and Ghemawat, Sanjay and Hsieh, Wilson C and Wallach, Deborah A and Burrows, Mike and Chandra, Tushar and Fikes, Andrew and Gruber, Robert E},
|
||||
journal={ACM Transactions on Computer Systems (TOCS)},
|
||||
volume={26},
|
||||
number={2},
|
||||
pages={4},
|
||||
year={2008},
|
||||
publisher={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{decandia2007dynamo,
|
||||
title={Dynamo: amazon's highly available key-value store},
|
||||
author={DeCandia, Giuseppe and Hastorun, Deniz and Jampani, Madan and Kakulapati, Gunavardhan and Lakshman, Avinash and Pilchin, Alex and Sivasubramanian, Swaminathan and Vosshall, Peter and Vogels, Werner},
|
||||
booktitle={ACM SIGOPS Operating Systems Review},
|
||||
volume={41},
|
||||
number={6},
|
||||
pages={205--220},
|
||||
year={2007},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{abadi2008column,
|
||||
title={Column-Stores vs. Row-Stores: How different are they really?},
|
||||
author={Abadi, Daniel J and Madden, Samuel R and Hachem, Nabil},
|
||||
booktitle={Proceedings of the 2008 ACM SIGMOD international conference on Management of data},
|
||||
pages={967--980},
|
||||
year={2008},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{bear2012vertica,
|
||||
title={The vertica database: SQL RDBMS for managing big data},
|
||||
author={Bear, Chuck and Lamb, Andrew and Tran, Nga},
|
||||
booktitle={Proceedings of the 2012 workshop on Management of big data systems},
|
||||
pages={37--38},
|
||||
year={2012},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@article{lakshman2010cassandra,
|
||||
title={Cassandra—A decentralized structured storage system},
|
||||
author={Lakshman, Avinash and Malik, Prashant},
|
||||
journal={Operating systems review},
|
||||
volume={44},
|
||||
number={2},
|
||||
pages={35},
|
||||
year={2010}
|
||||
}
|
||||
|
||||
@article{melnik2010dremel,
|
||||
title={Dremel: interactive analysis of web-scale datasets},
|
||||
author={Melnik, Sergey and Gubarev, Andrey and Long, Jing Jing and Romer, Geoffrey and Shivakumar, Shiva and Tolton, Matt and Vassilakis, Theo},
|
||||
journal={Proceedings of the VLDB Endowment},
|
||||
volume={3},
|
||||
number={1-2},
|
||||
pages={330--339},
|
||||
year={2010},
|
||||
publisher={VLDB Endowment}
|
||||
}
|
||||
|
||||
@article{hall2012processing,
|
||||
title={Processing a trillion cells per mouse click},
|
||||
author={Hall, Alexander and Bachmann, Olaf and B{\"u}ssow, Robert and G{\u{a}}nceanu, Silviu and Nunkesser, Marc},
|
||||
journal={Proceedings of the VLDB Endowment},
|
||||
volume={5},
|
||||
number={11},
|
||||
pages={1436--1446},
|
||||
year={2012},
|
||||
publisher={VLDB Endowment}
|
||||
}
|
||||
|
||||
@inproceedings{shvachko2010hadoop,
|
||||
title={The hadoop distributed file system},
|
||||
author={Shvachko, Konstantin and Kuang, Hairong and Radia, Sanjay and Chansler, Robert},
|
||||
booktitle={Mass Storage Systems and Technologies (MSST), 2010 IEEE 26th Symposium on},
|
||||
pages={1--10},
|
||||
year={2010},
|
||||
organization={IEEE}
|
||||
}
|
||||
|
||||
@article{colantonio2010concise,
|
||||
title={Concise: Compressed ‘n’Composable Integer Set},
|
||||
author={Colantonio, Alessandro and Di Pietro, Roberto},
|
||||
journal={Information Processing Letters},
|
||||
volume={110},
|
||||
number={16},
|
||||
pages={644--650},
|
||||
year={2010},
|
||||
publisher={Elsevier}
|
||||
}
|
||||
|
||||
@inproceedings{stonebraker2005c,
|
||||
title={C-store: a column-oriented DBMS},
|
||||
author={Stonebraker, Mike and Abadi, Daniel J and Batkin, Adam and Chen, Xuedong and Cherniack, Mitch and Ferreira, Miguel and Lau, Edmond and Lin, Amerson and Madden, Sam and O'Neil, Elizabeth and others},
|
||||
booktitle={Proceedings of the 31st international conference on Very large data bases},
|
||||
pages={553--564},
|
||||
year={2005},
|
||||
organization={VLDB Endowment}
|
||||
}
|
||||
|
||||
@inproceedings{engle2012shark,
|
||||
title={Shark: fast data analysis using coarse-grained distributed memory},
|
||||
author={Engle, Cliff and Lupher, Antonio and Xin, Reynold and Zaharia, Matei and Franklin, Michael J and Shenker, Scott and Stoica, Ion},
|
||||
booktitle={Proceedings of the 2012 international conference on Management of Data},
|
||||
pages={689--692},
|
||||
year={2012},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{zaharia2012discretized,
|
||||
title={Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters},
|
||||
author={Zaharia, Matei and Das, Tathagata and Li, Haoyuan and Shenker, Scott and Stoica, Ion},
|
||||
booktitle={Proceedings of the 4th USENIX conference on Hot Topics in Cloud Computing},
|
||||
pages={10--10},
|
||||
year={2012},
|
||||
organization={USENIX Association}
|
||||
}
|
||||
|
||||
@misc{marz2013storm,
|
||||
author = {Marz, Nathan},
|
||||
title = {Storm: Distributed and Fault-Tolerant Realtime Computation},
|
||||
month = {February},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://storm-project.net/}"
|
||||
}
|
||||
|
||||
@misc{tschetter2011druid,
|
||||
author = {Eric Tschetter},
|
||||
title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second},
|
||||
month = {April},
|
||||
year = {2011},
|
||||
howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}"
|
||||
}
|
||||
|
||||
@article{farber2012sap,
|
||||
title={SAP HANA database: data management for modern business applications},
|
||||
author={F{\"a}rber, Franz and Cha, Sang Kyun and Primsch, J{\"u}rgen and Bornh{\"o}vd, Christof and Sigg, Stefan and Lehner, Wolfgang},
|
||||
journal={ACM Sigmod Record},
|
||||
volume={40},
|
||||
number={4},
|
||||
pages={45--51},
|
||||
year={2012},
|
||||
publisher={ACM}
|
||||
}
|
||||
|
||||
@misc{voltdb2010voltdb,
|
||||
title={VoltDB Technical Overview},
|
||||
author={VoltDB, LLC},
|
||||
year={2010},
|
||||
howpublished = "\url{https://voltdb.com/}"
|
||||
}
|
||||
|
||||
@inproceedings{macnicol2004sybase,
|
||||
title={Sybase IQ multiplex-designed for analytics},
|
||||
author={MacNicol, Roger and French, Blaine},
|
||||
booktitle={Proceedings of the Thirtieth international conference on Very large data bases-Volume 30},
|
||||
pages={1227--1230},
|
||||
year={2004},
|
||||
organization={VLDB Endowment}
|
||||
}
|
||||
|
||||
@inproceedings{singh2011introduction,
|
||||
title={Introduction to the IBM Netezza warehouse appliance},
|
||||
author={Singh, Malcolm and Leonhardi, Ben},
|
||||
booktitle={Proceedings of the 2011 Conference of the Center for Advanced Studies on Collaborative Research},
|
||||
pages={385--386},
|
||||
year={2011},
|
||||
organization={IBM Corp.}
|
||||
}
|
||||
|
||||
@inproceedings{miner2012unified,
|
||||
title={Unified analytics platform for big data},
|
||||
author={Miner, Donald},
|
||||
booktitle={Proceedings of the WICSA/ECSA 2012 Companion Volume},
|
||||
pages={176--176},
|
||||
year={2012},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{fink2012distributed,
|
||||
title={Distributed computation on dynamo-style distributed storage: riak pipe},
|
||||
author={Fink, Bryan},
|
||||
booktitle={Proceedings of the eleventh ACM SIGPLAN workshop on Erlang workshop},
|
||||
pages={43--50},
|
||||
year={2012},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@misc{paraccel2013,
|
||||
key = {ParAccel Analytic Database},
|
||||
title = {ParAccel Analytic Database},
|
||||
month = {March},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://www.paraccel.com/resources/Datasheets/ParAccel-Core-Analytic-Database.pdf}"
|
||||
}
|
||||
|
||||
@misc{cloudera2013,
|
||||
key = {Cloudera Impala},
|
||||
title = {Cloudera Impala},
|
||||
month = {March},
|
||||
year = {2013},
|
||||
url = {},
|
||||
howpublished = "\url{http://blog.cloudera.com/blog}"
|
||||
}
|
||||
|
||||
@inproceedings{hunt2010zookeeper,
|
||||
title={ZooKeeper: Wait-free coordination for Internet-scale systems},
|
||||
author={Hunt, Patrick and Konar, Mahadev and Junqueira, Flavio P and Reed, Benjamin},
|
||||
booktitle={USENIX ATC},
|
||||
volume={10},
|
||||
year={2010}
|
||||
}
|
||||
|
||||
@inproceedings{kreps2011kafka,
|
||||
title={Kafka: A distributed messaging system for log processing},
|
||||
author={Kreps, Jay and Narkhede, Neha and Rao, Jun},
|
||||
booktitle={Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece},
|
||||
year={2011}
|
||||
}
|
||||
|
||||
@misc{liblzf2013,
|
||||
title = {LibLZF},
|
||||
key = {LibLZF},
|
||||
month = {March},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://freecode.com/projects/liblzf}"
|
||||
}
|
||||
|
||||
@inproceedings{tomasic1993performance,
|
||||
title={Performance of inverted indices in shared-nothing distributed text document information retrieval systems},
|
||||
author={Tomasic, Anthony and Garcia-Molina, Hector},
|
||||
booktitle={Parallel and Distributed Information Systems, 1993., Proceedings of the Second International Conference on},
|
||||
pages={8--17},
|
||||
year={1993},
|
||||
organization={IEEE}
|
||||
}
|
||||
|
||||
@inproceedings{antoshenkov1995byte,
|
||||
title={Byte-aligned bitmap compression},
|
||||
author={Antoshenkov, Gennady},
|
||||
booktitle={Data Compression Conference, 1995. DCC'95. Proceedings},
|
||||
pages={476},
|
||||
year={1995},
|
||||
organization={IEEE}
|
||||
}
|
||||
|
||||
@inproceedings{van2011memory,
|
||||
title={A memory efficient reachability data structure through bit vector compression},
|
||||
author={van Schaik, Sebastiaan J and de Moor, Oege},
|
||||
booktitle={Proceedings of the 2011 international conference on Management of data},
|
||||
pages={913--924},
|
||||
year={2011},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{o1993lru,
|
||||
title={The LRU-K page replacement algorithm for database disk buffering},
|
||||
author={O'neil, Elizabeth J and O'neil, Patrick E and Weikum, Gerhard},
|
||||
booktitle={ACM SIGMOD Record},
|
||||
volume={22},
|
||||
number={2},
|
||||
pages={297--306},
|
||||
year={1993},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@article{kim2001lrfu,
|
||||
title={LRFU: A spectrum of policies that subsumes the least recently used and least frequently used policies},
|
||||
author={Kim, Chong Sang},
|
||||
journal={IEEE Transactions on Computers},
|
||||
volume={50},
|
||||
number={12},
|
||||
year={2001}
|
||||
}
|
||||
|
||||
@article{wu2006optimizing,
|
||||
title={Optimizing bitmap indices with efficient compression},
|
||||
author={Wu, Kesheng and Otoo, Ekow J and Shoshani, Arie},
|
||||
journal={ACM Transactions on Database Systems (TODS)},
|
||||
volume={31},
|
||||
number={1},
|
||||
pages={1--38},
|
||||
year={2006},
|
||||
publisher={ACM}
|
||||
}
|
||||
|
||||
@misc{twitter2013,
|
||||
key = {Twitter Public Streams},
|
||||
title = {Twitter Public Streams},
|
||||
month = {March},
|
||||
year = {2013},
|
||||
howpublished = "\url{https://dev.twitter.com/docs/streaming-apis/streams/public}"
|
||||
}
|
||||
|
||||
@article{fitzpatrick2004distributed,
|
||||
title={Distributed caching with memcached},
|
||||
author={Fitzpatrick, Brad},
|
||||
journal={Linux journal},
|
||||
number={124},
|
||||
pages={72--74},
|
||||
year={2004}
|
||||
}
|
||||
@inproceedings{amdahl1967validity,
|
||||
title={Validity of the single processor approach to achieving large scale computing capabilities},
|
||||
author={Amdahl, Gene M},
|
||||
booktitle={Proceedings of the April 18-20, 1967, spring joint computer conference},
|
||||
pages={483--485},
|
||||
year={1967},
|
||||
organization={ACM}
|
||||
}
|
||||
@book{sarawagi1998discovery,
|
||||
title={Discovery-driven exploration of OLAP data cubes},
|
||||
author={Sarawagi, Sunita and Agrawal, Rakesh and Megiddo, Nimrod},
|
||||
year={1998},
|
||||
publisher={Springer}
|
||||
}
|
||||
@article{hu2011stream,
|
||||
title={Stream Database Survey},
|
||||
author={Hu, Bo},
|
||||
year={2011}
|
||||
}
|
||||
|
||||
@article{dean2008mapreduce,
|
||||
title={MapReduce: simplified data processing on large clusters},
|
||||
author={Dean, Jeffrey and Ghemawat, Sanjay},
|
||||
journal={Communications of the ACM},
|
||||
volume={51},
|
||||
number={1},
|
||||
pages={107--113},
|
||||
year={2008},
|
||||
publisher={ACM}
|
||||
}
|
||||
|
||||
@misc{linkedin2013senseidb,
|
||||
author = {LinkedIn},
|
||||
title = {SenseiDB},
|
||||
month = {July},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://www.senseidb.com/}"
|
||||
}
|
||||
|
||||
@misc{apache2013solr,
|
||||
author = {Apache},
|
||||
title = {Apache Solr},
|
||||
month = {February},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://lucene.apache.org/solr/}"
|
||||
}
|
||||
|
||||
@misc{banon2013elasticsearch,
|
||||
author = {Banon, Shay},
|
||||
title = {ElasticSearch},
|
||||
month = {July},
|
||||
year = {2013},
|
||||
howpublished = "\url{http://www.elasticseach.com/}"
|
||||
}
|
||||
|
||||
@book{oehler2012ibm,
|
||||
title={IBM Cognos TM1: The Official Guide},
|
||||
author={Oehler, Karsten and Gruenes, Jochen and Ilacqua, Christopher and Perez, Manuel},
|
||||
year={2012},
|
||||
publisher={McGraw-Hill}
|
||||
}
|
||||
|
||||
@book{schrader2009oracle,
|
||||
title={Oracle Essbase \& Oracle OLAP},
|
||||
author={Schrader, Michael and Vlamis, Dan and Nader, Mike and Claterbos, Chris and Collins, Dave and Campbell, Mitch and Conrad, Floyd},
|
||||
year={2009},
|
||||
publisher={McGraw-Hill, Inc.}
|
||||
}
|
||||
|
||||
@book{lachev2005applied,
|
||||
title={Applied Microsoft Analysis Services 2005: And Microsoft Business Intelligence Platform},
|
||||
author={Lachev, Teo},
|
||||
year={2005},
|
||||
publisher={Prologika Press}
|
||||
}
|
||||
|
||||
@article{o1996log,
|
||||
title={The log-structured merge-tree (LSM-tree)},
|
||||
author={O’Neil, Patrick and Cheng, Edward and Gawlick, Dieter and O’Neil, Elizabeth},
|
||||
journal={Acta Informatica},
|
||||
volume={33},
|
||||
number={4},
|
||||
pages={351--385},
|
||||
year={1996},
|
||||
publisher={Springer}
|
||||
}
|
||||
|
||||
@inproceedings{o1997improved,
|
||||
title={Improved query performance with variant indexes},
|
||||
author={O'Neil, Patrick and Quass, Dallan},
|
||||
booktitle={ACM Sigmod Record},
|
||||
volume={26},
|
||||
number={2},
|
||||
pages={38--49},
|
||||
year={1997},
|
||||
organization={ACM}
|
||||
}
|
||||
|
||||
@inproceedings{cipar2012lazybase,
|
||||
title={LazyBase: trading freshness for performance in a scalable database},
|
||||
author={Cipar, James and Ganger, Greg and Keeton, Kimberly and Morrey III, Charles B and Soules, Craig AN and Veitch, Alistair},
|
||||
booktitle={Proceedings of the 7th ACM european conference on Computer Systems},
|
||||
pages={169--182},
|
||||
year={2012},
|
||||
organization={ACM}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
This is BibTeX, Version 0.99d (TeX Live 2012)
|
||||
Capacity: max_strings=35307, hash_size=35307, hash_prime=30011
|
||||
The top-level auxiliary file: druid_demo.aux
|
||||
The style file: abbrv.bst
|
||||
Database file #1: druid_demo.bib
|
||||
You've used 4 entries,
|
||||
2118 wiz_defined-function locations,
|
||||
524 strings with 4556 characters,
|
||||
and the built_in function-call counts, 1592 in all, are:
|
||||
= -- 160
|
||||
> -- 67
|
||||
< -- 3
|
||||
+ -- 26
|
||||
- -- 22
|
||||
* -- 105
|
||||
:= -- 251
|
||||
add.period$ -- 14
|
||||
call.type$ -- 4
|
||||
change.case$ -- 23
|
||||
chr.to.int$ -- 0
|
||||
cite$ -- 4
|
||||
duplicate$ -- 67
|
||||
empty$ -- 133
|
||||
format.name$ -- 22
|
||||
if$ -- 349
|
||||
int.to.chr$ -- 0
|
||||
int.to.str$ -- 4
|
||||
missing$ -- 4
|
||||
newline$ -- 23
|
||||
num.names$ -- 8
|
||||
pop$ -- 30
|
||||
preamble$ -- 1
|
||||
purify$ -- 19
|
||||
quote$ -- 0
|
||||
skip$ -- 47
|
||||
stack$ -- 0
|
||||
substring$ -- 96
|
||||
swap$ -- 22
|
||||
text.length$ -- 3
|
||||
text.prefix$ -- 0
|
||||
top$ -- 0
|
||||
type$ -- 16
|
||||
warning$ -- 0
|
||||
while$ -- 16
|
||||
width$ -- 5
|
||||
write$ -- 48
|
|
@ -0,0 +1,18 @@
|
|||
\BOOKMARK [1][-]{section.1}{Introduction}{}% 1
|
||||
\BOOKMARK [2][-]{subsection.1.1}{The Need for Druid}{section.1}% 2
|
||||
\BOOKMARK [1][-]{section.2}{Architecture}{}% 3
|
||||
\BOOKMARK [2][-]{subsection.2.1}{Real-time Nodes}{section.2}% 4
|
||||
\BOOKMARK [2][-]{subsection.2.2}{Historical Nodes}{section.2}% 5
|
||||
\BOOKMARK [2][-]{subsection.2.3}{Broker Nodes}{section.2}% 6
|
||||
\BOOKMARK [2][-]{subsection.2.4}{Coordinator Nodes}{section.2}% 7
|
||||
\BOOKMARK [2][-]{subsection.2.5}{Query Processing}{section.2}% 8
|
||||
\BOOKMARK [2][-]{subsection.2.6}{Query Capabilities}{section.2}% 9
|
||||
\BOOKMARK [1][-]{section.3}{Performance}{}% 10
|
||||
\BOOKMARK [2][-]{subsection.3.1}{Query Performance}{section.3}% 11
|
||||
\BOOKMARK [2][-]{subsection.3.2}{Data Ingestion Performance}{section.3}% 12
|
||||
\BOOKMARK [1][-]{section.4}{Demonstration Details}{}% 13
|
||||
\BOOKMARK [2][-]{subsection.4.1}{Setup}{section.4}% 14
|
||||
\BOOKMARK [2][-]{subsection.4.2}{Goals}{section.4}% 15
|
||||
\BOOKMARK [1][-]{section.5}{Acknowledgments}{}% 16
|
||||
\BOOKMARK [1][-]{section.6}{Additional Authors}{}% 17
|
||||
\BOOKMARK [1][-]{section.7}{References}{}% 18
|
|
@ -0,0 +1,464 @@
|
|||
% THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012
|
||||
% based on ACM SIGPROC-SP.TEX VERSION 2.7
|
||||
% Modified by Gerald Weber <gerald@cs.auckland.ac.nz>
|
||||
% Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012)
|
||||
% Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012)
|
||||
|
||||
\documentclass{vldb}
|
||||
\usepackage{graphicx}
|
||||
\usepackage{balance} % for \balance command ON LAST PAGE (only there!)
|
||||
\usepackage{fontspec}
|
||||
\usepackage{hyperref}
|
||||
\graphicspath{{figures/}}
|
||||
\usepackage{enumitem}
|
||||
|
||||
\begin{document}
|
||||
|
||||
% ****************** TITLE ****************************************
|
||||
|
||||
\title{Druid: Open Source Real-time Analytics at Scale}
|
||||
|
||||
% possible, but not really needed or used for PVLDB:
|
||||
%\subtitle{[Extended Abstract]
|
||||
%\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}}
|
||||
|
||||
% ****************** AUTHORS **************************************
|
||||
|
||||
% You need the command \numberofauthors to handle the 'placement
|
||||
% and alignment' of the authors beneath the title.
|
||||
%
|
||||
% For aesthetic reasons, we recommend 'three authors at a time'
|
||||
% i.e. three 'name/affiliation blocks' be placed beneath the title.
|
||||
%
|
||||
% NOTE: You are NOT restricted in how many 'rows' of
|
||||
% "name/affiliations" may appear. We just ask that you restrict
|
||||
% the number of 'columns' to three.
|
||||
%
|
||||
% Because of the available 'opening page real-estate'
|
||||
% we ask you to refrain from putting more than six authors
|
||||
% (two rows with three columns) beneath the article title.
|
||||
% More than six makes the first-page appear very cluttered indeed.
|
||||
%
|
||||
% Use the \alignauthor commands to handle the names
|
||||
% and affiliations for an 'aesthetic maximum' of six authors.
|
||||
% Add names, affiliations, addresses for
|
||||
% the seventh etc. author(s) as the argument for the
|
||||
% \additionalauthors command.
|
||||
% These 'additional authors' will be output/set for you
|
||||
% without further effort on your part as the last section in
|
||||
% the body of your article BEFORE References or any Appendices.
|
||||
|
||||
\numberofauthors{6} % in this sample file, there are a *total*
|
||||
% of EIGHT authors. SIX appear on the 'first-page' (for formatting
|
||||
% reasons) and the remaining two appear in the \additionalauthors section.
|
||||
|
||||
\author{
|
||||
% You can go ahead and credit any number of authors here,
|
||||
% e.g. one 'row of three' or two rows (consisting of one row of three
|
||||
% and a second row of one, two or three).
|
||||
%
|
||||
% The command \alignauthor (no curly braces needed) should
|
||||
% precede each author name, affiliation/snail-mail address and
|
||||
% e-mail address. Additionally, tag each line of
|
||||
% affiliation/address with \affaddr, and tag the
|
||||
% e-mail address with \email.
|
||||
%
|
||||
% 1st. author
|
||||
\alignauthor
|
||||
Fangjin Yang\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{fangjin@metamarkets.com}
|
||||
% 2nd. author
|
||||
\alignauthor
|
||||
Eric Tschetter\\
|
||||
\email{echeddar@gmail.com}
|
||||
% 3rd. author
|
||||
\alignauthor
|
||||
Xavier Léauté\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{xavier@metamarkets.com}
|
||||
\and % use '\and' if you need 'another row' of author names
|
||||
% 4th. author
|
||||
\alignauthor
|
||||
Nishant Bangarwa\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{nishant@metamarkets.com}
|
||||
% 5th. author
|
||||
\alignauthor
|
||||
Nelson Ray\\
|
||||
\email{ncray86@gmail.com}
|
||||
% 6th. author
|
||||
\alignauthor
|
||||
Gian Merlino\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{gian@metamarkets.com}
|
||||
}
|
||||
% There's nothing stopping you putting the seventh, eighth, etc.
|
||||
% author on the opening page (as the 'third row') but we ask,
|
||||
% for aesthetic reasons that you place these 'additional authors'
|
||||
% in the \additional authors block, viz.
|
||||
\additionalauthors{Additional authors: Deep Ganguli (Metamarkets Group, Inc., {\texttt{deep@metamarkets.com}}), Himadri Singh (Metamarkets Group, Inc., {\texttt{himadri@metamarkets.com}}), Igal Levy (Metamarkets Group, Inc., {\texttt{igal@metamarkets.com}})}
|
||||
\date{14 March 2014}
|
||||
% Just remember to make sure that the TOTAL number of authors
|
||||
% is the number that will appear on the first page PLUS the
|
||||
% number that will appear in the \additionalauthors section.
|
||||
|
||||
|
||||
\maketitle
|
||||
|
||||
\begin{abstract}
|
||||
Druid is an open
|
||||
source\footnote{\href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
|
||||
data store built for exploratory analytics on large data sets. Druid supports
|
||||
fast data aggregation, low latency data ingestion, and arbitrary data
|
||||
exploration. The system combines a column-oriented storage layout, a
|
||||
distributed, shared-nothing architecture, and an advanced indexing structure to
|
||||
return queries on billions of rows in milliseconds. Druid is petabyte scale and
|
||||
is deployed in production at several technology companies.
|
||||
\end{abstract}
|
||||
|
||||
\section{Introduction}
|
||||
The recent proliferation of internet technology has created a surge
|
||||
in machine-generated events. Individually, these events contain minimal useful
|
||||
information and are of low value. Given the time and resources required to
|
||||
extract meaning from large collections of events, many companies were willing
|
||||
to discard this data instead.
|
||||
|
||||
A few years ago, Google introduced MapReduce as their mechanism of leveraging
|
||||
commodity hardware to index the internet and analyze logs. The Hadoop project
|
||||
soon followed and was largely patterned after the insights that came out of the
|
||||
original MapReduce paper. Hadoop has contributed much to helping companies
|
||||
convert their low-value event streams into high-value aggregates for a variety
|
||||
of applications such as business intelligence and A-B testing.
|
||||
|
||||
As with a lot of great systems, Hadoop has opened our eyes to a new space of
|
||||
problems. Specifically, Hadoop excels at storing and providing access to large
|
||||
amounts of data, however, it does not make any performance guarantees around
|
||||
how quickly that data can be accessed. Furthermore, although Hadoop is a
|
||||
highly available system, performance degrades under heavy concurrent load.
|
||||
Lastly, while Hadoop works well for storing data, it is not optimized for
|
||||
ingesting data and making that data immediately readable.
|
||||
|
||||
\subsection{The Need for Druid}
|
||||
Druid was originally designed to solve problems around ingesting and exploring
|
||||
large quantities of transactional events (log data). This form of timeseries
|
||||
data (OLAP data) is commonly found in the business intelligence
|
||||
space and the nature of the data tends to be very append heavy. Events typically
|
||||
have three distinct components: a timestamp column indicating when the event
|
||||
occurred, a set of dimension columns indicating various attributes about the
|
||||
event, and a set of metric columns containing values (usually numeric) that can
|
||||
be aggregated. Queries are typically issued for the sum of some set of metrics,
|
||||
filtered by some set of dimensions, over some span of time.
|
||||
|
||||
The Druid project first began out of necessity at Metamarkets to power a
|
||||
business intelligence dashboard that allowed users to arbitrarily explore and
|
||||
visualize event streams. Existing open source Relational Database Management
|
||||
Systems, cluster computing frameworks, and NoSQL key/value stores were unable
|
||||
to provide a low latency data ingestion and query platform for an interactive
|
||||
dashboard. Queries needed to return fast enough to allow the data
|
||||
visualizations in the dashboard to update interactively.
|
||||
|
||||
In addition to the query latency needs, the system had to be multi-tenant and
|
||||
highly available, as the dashboard is used in a highly concurrent environment.
|
||||
Downtime is costly and many businesses cannot afford to wait if a system is
|
||||
unavailable in the face of software upgrades or network failure. Finally,
|
||||
Metamarkets also wanted to allow users and alerting systems to be able to make
|
||||
business decisions in ``real-time". The time from when an event is created to
|
||||
when that event is queryable determines how fast users and systems are able to
|
||||
react to potentially catastrophic occurrences in their systems.
|
||||
|
||||
The problems of data exploration, ingestion, and availability span multiple
|
||||
industries. Since Druid was open sourced in October 2012, it has been deployed as a
|
||||
video, network monitoring, operations monitoring, and online advertising
|
||||
analytics platform at multiple companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
|
||||
|
||||
\begin{figure*}
|
||||
\centering
|
||||
\includegraphics[width = 4.5in]{cluster}
|
||||
\caption{An overview of a Druid cluster and the flow of data through the cluster.}
|
||||
\label{fig:cluster}
|
||||
\end{figure*}
|
||||
|
||||
\section{Architecture}
|
||||
A Druid cluster consists of different types of nodes and each node type is
|
||||
designed to perform a specific set of things. We believe this design separates
|
||||
concerns and simplifies the complexity of the system. The different node types
|
||||
operate fairly independently of each other and there is minimal interaction among
|
||||
them. Hence, intra-cluster communication failures have minimal impact on data
|
||||
availability. To solve complex data analysis problems, the different node
|
||||
types come together to form a fully working system. The composition of and flow
|
||||
of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. All Druid
|
||||
nodes announce their availability and the data they are serving over
|
||||
Zookeeper\cite{hunt2010zookeeper}.
|
||||
|
||||
\subsection{Real-time Nodes}
|
||||
Real-time nodes encapsulate the functionality to ingest and query event
|
||||
streams. Events indexed via these nodes are immediately available for querying.
|
||||
These nodes are only concerned with events for some small time range. They
|
||||
periodically hand off batches of immutable events to other nodes in the Druid
|
||||
cluster that are specialized in dealing with batches of immutable events.
|
||||
|
||||
Real-time nodes maintain an in-memory index buffer for all incoming events.
|
||||
These indexes are incrementally populated as new events are ingested and the
|
||||
indexes are also directly queryable. To avoid heap overflow problems, real-time
|
||||
nodes persist their in-memory indexes to disk either periodically or after some
|
||||
maximum row limit is reached. This persist process converts data stored in the
|
||||
in-memory buffer to a column oriented storage format. Each persisted index is
|
||||
immutable and real-time nodes load persisted indexes into off-heap memory such
|
||||
that they can still be queried. On a periodic basis, each real-time node will
|
||||
schedule a background task that searches for all locally persisted indexes. The
|
||||
task merges these indexes together and builds an immutable block of data that
|
||||
contains all the events that have ingested by a real-time node for some span of
|
||||
time. We refer to this block of data as a ``segment". During the handoff stage,
|
||||
a real-time node uploads this segment to permanent backup storage, typically
|
||||
a distributed file system that Druid calls ``deep storage".
|
||||
|
||||
\subsection{Historical Nodes}
|
||||
Historical nodes encapsulate the functionality to load and serve the immutable
|
||||
blocks of data (segments) created by real-time nodes. In many real-world
|
||||
workflows, most of the data loaded in a Druid cluster is immutable and hence
|
||||
historical nodes are typically the main workers of a Druid cluster. Historical
|
||||
nodes follow a shared-nothing architecture and there is no single point of
|
||||
contention among the nodes. The nodes have no knowledge of one another and are
|
||||
operationally simple; they only know how to load, drop, and serve immutable
|
||||
segments.
|
||||
|
||||
\subsection{Broker Nodes}
|
||||
Broker nodes act as query routers to historical and real-time nodes. Broker
|
||||
nodes understand what segments are queryable and where those segments are
|
||||
located. Broker nodes route incoming queries such that the queries hit the
|
||||
right historical or real-time nodes. Broker nodes also merge partial results
|
||||
from historical and real-time nodes before returning a final consolidated
|
||||
result to the caller.
|
||||
|
||||
\subsection{Coordinator Nodes}
|
||||
Druid coordinator nodes are primarily in charge of data management and
|
||||
distribution on historical nodes. The coordinator nodes tell historical nodes
|
||||
to load new data, drop outdated data, replicate data, and move data to load
|
||||
balance. Coordinator nodes undergo a
|
||||
leader-election process that determines a single node that runs the coordinator
|
||||
functionality. The remaining coordinator nodes act as redundant backups.
|
||||
|
||||
A coordinator node runs periodically to determine the current state of the
|
||||
cluster. It makes decisions by comparing the expected state of the cluster with
|
||||
the actual state of the cluster at the time of the run. Coordinator nodes also
|
||||
maintain a connection to a MySQL database that contains additional operational
|
||||
parameters and configurations. One of the key pieces of information located in
|
||||
the MySQL database is a table that contains a list of all segments that should
|
||||
be served by historical nodes. This table can be updated by any service that
|
||||
creates segments, such as real-time nodes.
|
||||
|
||||
\subsection{Query Processing}
|
||||
Data tables in Druid (called \emph{data sources}) are collections of
|
||||
timestamped events partitioned into a set of segments, where each segment
|
||||
is typically 5--10 million rows. Formally, we define a segment as a collection
|
||||
of rows of data that span some period in time. Segments represent the
|
||||
fundamental storage unit in Druid and replication and distribution are done at
|
||||
a segment level.
|
||||
|
||||
Druid segments are stored in a column orientation. Given that Druid is best
|
||||
used for aggregating event streams (all data going into Druid must have a
|
||||
timestamp), the advantages storing aggregate information as columns rather than
|
||||
rows are well documented \cite{abadi2008column}. Column storage allows for more
|
||||
efficient CPU usage as only what is needed is actually loaded and scanned.
|
||||
|
||||
Druid has multiple column types to represent various data formats. Depending on
|
||||
the column type, different compression methods are used to reduce the cost of
|
||||
storing a column in memory and on disk. For example, if an entire column only
|
||||
contains string values, storing the raw strings is unnecessarily costly.
|
||||
String columns can be dictionary encoded instead. Dictionary encoding is a
|
||||
common method to compress data in column stores.
|
||||
|
||||
In many real world OLAP workflows, queries are issued for the aggregated
|
||||
results of some set of metrics where some set of dimension specifications are
|
||||
met. Consider Table~\ref{tab:sample_data}. An example query for this table may
|
||||
ask: ``How much revenue was generated in the first hour of 2014-01-01 in the
|
||||
city of San Francisco?". This query is filtering a sales data set based on a
|
||||
Boolean expression of dimension values. In many real world data sets, dimension
|
||||
columns contain strings and metric columns contain numbers. Druid creates
|
||||
additional lookup indices for string columns such that only those rows that
|
||||
pertain to a particular query filter are ever scanned.
|
||||
|
||||
\begin{table}
|
||||
\centering
|
||||
\begin{tabular}{| l | l | l |}
|
||||
\hline
|
||||
\textbf{Timestamp} & \textbf{City} & \textbf{Revenue} \\ \hline
|
||||
2014-01-01T01:00:00Z & San Francisco & 25 \\ \hline
|
||||
2014-01-01T01:00:00Z & San Francisco & 42 \\ \hline
|
||||
2014-01-01T02:00:00Z & New York & 17 \\ \hline
|
||||
2014-01-01T02:00:00Z & New York & 170 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Sample sales data set.}
|
||||
\label{tab:sample_data}
|
||||
\end{table}
|
||||
|
||||
For each unique city in
|
||||
Table~\ref{tab:sample_data}, we can form some representation
|
||||
indicating in which table rows a particular city is seen. We can
|
||||
store this information in a binary array where the array indices
|
||||
represent our rows. If a particular page is seen in a certain
|
||||
row, that array index is marked as \texttt{1}. For example:
|
||||
{\small\begin{verbatim}
|
||||
San Francisco -> rows [0, 1] -> [1][1][0][0]
|
||||
New York -> rows [2, 3] -> [0][0][1][1]
|
||||
\end{verbatim}}
|
||||
|
||||
\texttt{San Francisco} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
|
||||
to row indices forms an inverted index \cite{tomasic1993performance}. To know which
|
||||
rows contain {\ttfamily San Francisco} or {\ttfamily New York}, we can \texttt{OR} together
|
||||
the two arrays.
|
||||
{\small\begin{verbatim}
|
||||
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
|
||||
\end{verbatim}}
|
||||
|
||||
This approach of performing Boolean operations on large bitmap sets is commonly
|
||||
used in search engines. Druid compresses each bitmap index using the Concise
|
||||
algorithm \cite{colantonio2010concise}. All Boolean operations on top of these
|
||||
Concise sets are done without decompressing the set.
|
||||
|
||||
\subsection{Query Capabilities}
|
||||
Druid supports many types of aggregations including double sums, long sums,
|
||||
minimums, maximums, and complex aggregations such as cardinality estimation and
|
||||
approximate quantile estimation. The results of aggregations can be combined
|
||||
in mathematical expressions to form other aggregations. Druid supports
|
||||
different query types ranging from simple aggregates for an interval time,
|
||||
groupBys, and approximate top-K queries.
|
||||
|
||||
\section{Performance}
|
||||
Druid runs in production at several organizations, and to briefly demonstrate its
|
||||
performance, we have chosen to share some real world numbers for the main production
|
||||
cluster running at Metamarkets in early 2014. For comparison with other databases
|
||||
we also include results from synthetic workloads on TPC-H data.
|
||||
|
||||
\subsection{Query Performance}
|
||||
Query latencies are shown in Figure~\ref{fig:query_latency} for a cluster
|
||||
hosting approximately 10.5TB of data using 1302 processing threads and 672
|
||||
total cores (hyperthreaded). There are approximately 50 billion rows of data in
|
||||
this cluster.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{avg_query_latency}
|
||||
\caption{Query latencies of production data sources.}
|
||||
\label{fig:query_latency}
|
||||
\end{figure}
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{tpch_100gb}
|
||||
\caption{Druid \& MySQL benchmarks -- 100GB TPC-H data.}
|
||||
\label{fig:tpch_100gb}
|
||||
\end{figure}
|
||||
|
||||
The average queries per minute during this time was approximately
|
||||
1000. The number of dimensions the various data sources vary from 25 to 78
|
||||
dimensions, and 8 to 35 metrics. Across all the various data sources, average
|
||||
query latency is approximately 550 milliseconds, with 90\% of queries returning
|
||||
in less than 1 second, 95\% in under 2 seconds, and 99\% of queries returning
|
||||
in less than 10 seconds.
|
||||
|
||||
Approximately 30\% of the queries are standard
|
||||
aggregates involving different types of metrics and filters, 60\% of queries
|
||||
are ordered group bys over one or more dimensions with aggregates, and 10\% of
|
||||
queries are search queries and metadata retrieval queries. The number of
|
||||
columns scanned in aggregate queries roughly follows an exponential
|
||||
distribution. Queries involving a single column are very frequent, and queries
|
||||
involving all columns are very rare.
|
||||
|
||||
We also present Druid benchmarks on TPC-H data in Figure~\ref{fig:tpch_100g}.
|
||||
Most TPC-H queries do not directly apply to Druid, so we selected queries more
|
||||
typical of Druid's workload to demonstrate query performance. As a comparison,
|
||||
we also provide the results of the same queries using MySQL using the MyISAM
|
||||
engine (InnoDB was slower in our experiments).
|
||||
|
||||
We benchmarked Druid's scan rate at 53,539,211 rows/second/core for
|
||||
\texttt{select count(*)} equivalent query over a given time interval and
|
||||
36,246,530 rows/second/core for a \texttt{select sum(float)} type query.
|
||||
|
||||
\subsection{Data Ingestion Performance}
|
||||
To showcase Druid's data ingestion latency, we selected several production
|
||||
datasources of varying dimensions, metrics, and event volumes. Druid's data
|
||||
ingestion latency is heavily dependent on the complexity of the data set being
|
||||
ingested. The data complexity is determined by the number of dimensions in each
|
||||
event, the number of metrics in each event, and the types of aggregations we
|
||||
want to perform on those metrics.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{ingestion_rate}
|
||||
\caption{Combined cluster ingestion rates.}
|
||||
\label{fig:ingestion_rate}
|
||||
\end{figure}
|
||||
|
||||
For the given datasources, the number of dimensions vary from 5 to 35, and the
|
||||
number of metrics vary from 2 to 24. The peak ingestion latency we measured in
|
||||
production was 22914.43 events/second/core on a datasource with 30 dimensions
|
||||
and 19 metrics.
|
||||
|
||||
The latency measurements we presented are sufficient to address the our stated
|
||||
problems of interactivity. We would prefer the variability in the latencies to
|
||||
be less, which can be achieved by adding additional
|
||||
hardware, but we have not chosen to do so because of cost concerns.
|
||||
|
||||
\section{Demonstration Details}
|
||||
|
||||
We would like to do an end-to-end demonstratation of Druid, from setting up a
|
||||
cluster, ingesting data, structuring a query, and obtaining results. We would
|
||||
also like to showcase how to solve real-world data analysis problems with Druid
|
||||
and demonstrate tools that can be built on top of it, including interactive
|
||||
data visualizations, approximate algorithms, and machine-learning components.
|
||||
We already use similar tools in production.
|
||||
|
||||
\subsection{Setup}
|
||||
|
||||
Users will be able to set up a local Druid cluster to better understand the
|
||||
components and architecture of the system. Druid is designed to run on
|
||||
commodity hardware and Druid nodes are simply java processes that need to be
|
||||
started up. The local setup will allow users to ingest data from Twitter's
|
||||
public API and query it. We will also provide users access to an AWS hosted
|
||||
Druid cluster that contains several terabytes of Twitter data that we have been
|
||||
collecting for over 2 years. There are over 3 billion tweets in this data set,
|
||||
and new events are constantly being ingested. We will walk through a variety of
|
||||
different queries to demonstrate Druid's arbitrary data-exploration
|
||||
capabilities.
|
||||
|
||||
Finally, we will teach users how to build a simple interactive dashboard on top
|
||||
of Druid. The dashboard will use some of Druid's more powerful features such as
|
||||
approximate algorithms for quickly determining the cardinality of sets, and
|
||||
machine learning algorithms for scientific computing problems such as anomaly
|
||||
detection. These use cases represent some of the more interesting problems we
|
||||
use Druid for in production.
|
||||
|
||||
\subsection{Goals}
|
||||
|
||||
We will not only walk users through solving real-world problems with Druid and
|
||||
different tools that have been built on top of Druid, but also answer
|
||||
conference-specific questions such as what are the trending tweets and topics
|
||||
at VLDB, what netizens are conversing about in the general area, and even
|
||||
perform a sentiment analysis of VLDB. Our goal is to clearly explain why the
|
||||
architecture of Druid makes it highly optimal for certain types of queries, and
|
||||
the potential of the system as a real-time analytics platform.
|
||||
|
||||
%\end{document} % This is where a 'short' article might terminate
|
||||
|
||||
% ensure same length columns on last page (might need two sub-sequent latex runs)
|
||||
\balance
|
||||
|
||||
%ACKNOWLEDGMENTS are optional
|
||||
\section{Acknowledgments}
|
||||
Druid could not have been built without the help of many great people in the
|
||||
community. We want to thank everyone that has contributed to the Druid
|
||||
codebase for their invaluable support.
|
||||
|
||||
% The following two commands are all you need in the
|
||||
% initial runs of your .tex file to
|
||||
% produce the bibliography for the citations in your paper.
|
||||
\bibliographystyle{abbrv}
|
||||
\bibliography{druid_demo} % vldb_sample.bib is the name of the Bibliography in this case
|
||||
% You must have a proper ".bib" file
|
||||
% and remember to run:
|
||||
% latex bibtex latex latex
|
||||
% to resolve all references
|
||||
|
||||
\end{document}
|
After Width: | Height: | Size: 35 KiB |
After Width: | Height: | Size: 53 KiB |
After Width: | Height: | Size: 28 KiB |
After Width: | Height: | Size: 51 KiB |
After Width: | Height: | Size: 35 KiB |
After Width: | Height: | Size: 36 KiB |
After Width: | Height: | Size: 74 KiB |
After Width: | Height: | Size: 73 KiB |
After Width: | Height: | Size: 85 KiB |
After Width: | Height: | Size: 43 KiB |
|
@ -12,7 +12,7 @@ zip : sgmd0658-yang.zip
|
|||
zip $@ $*.pdf $*.tex dummy.ps
|
||||
|
||||
clean :
|
||||
@rm -f *.aux *.bbl *.blg *.log dummy.ps *.zip
|
||||
@rm -f *.aux *.bbl *.blg *.out *.log dummy.ps *.zip
|
||||
|
||||
%.tex : %.bib
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
\let\crnotice\mycrnotice%
|
||||
\let\confname\myconfname%
|
||||
\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.}
|
||||
\conferenceinfo{SIGMOD/PODS'14,}{June 22--27, 2014, Salt Lake City, UT, USA. \\
|
||||
\conferenceinfo{SIGMOD'14,}{June 22--27, 2014, Snowbird, UT, USA. \\
|
||||
{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}}
|
||||
\copyrightetc{ACM \the\acmcopyr}
|
||||
\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\
|
||||
|
@ -73,7 +73,7 @@ aggregations, flexible filters, and low latency data ingestion.
|
|||
% A category with the (minimum) three required fields
|
||||
\category{H.2.4}{Database Management}{Systems}[Distributed databases]
|
||||
% \category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
|
||||
\keywords{distributed; real-time; fault-tolerant; analytics; OLAP; columnar}
|
||||
\keywords{distributed; real-time; fault-tolerant; analytics; column-oriented; OLAP}
|
||||
|
||||
|
||||
\section{Introduction}
|
||||
|
@ -112,13 +112,12 @@ highly concurrent environment (1000+ users), Hadoop wasn't going to meet our
|
|||
needs. We explored different solutions in the space, and after
|
||||
trying both Relational Database Management Systems and NoSQL architectures, we
|
||||
came to the conclusion that there was nothing in the open source world that
|
||||
could be fully leveraged for our requirements.
|
||||
|
||||
We ended up creating Druid, an open-source, distributed, column-oriented,
|
||||
real-time analytical data store. In many ways, Druid shares similarities with
|
||||
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
|
||||
could be fully leveraged for our requirements. We ended up creating Druid, an
|
||||
open-source, distributed, column-oriented, real-time analytical data store. In
|
||||
many ways, Druid shares similarities with other OLAP systems
|
||||
\cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
|
||||
interactive query systems \cite{melnik2010dremel}, main-memory databases
|
||||
\cite{farber2012sap}, and widely-known distributed data stores
|
||||
\cite{farber2012sap}, as well as widely known distributed data stores
|
||||
\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The
|
||||
distribution and query model also borrow ideas from current generation search
|
||||
infrastructure \cite{linkedin2013senseidb, apache2013solr,
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
||||
private final String segmentIdentifier;
|
||||
private final SegmentDescriptor segmentDescriptor;
|
||||
private final QueryRunner<T> base;
|
||||
private final QueryToolChest toolChest;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper mapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
public CachePopulatingQueryRunner(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
|
||||
Cache cache,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunner<T> base,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.base = base;
|
||||
this.segmentIdentifier = segmentIdentifier;
|
||||
this.segmentDescriptor = segmentDescriptor;
|
||||
this.toolChest = toolchest;
|
||||
this.cache = cache;
|
||||
this.mapper = mapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
|
||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
||||
&& strategy != null
|
||||
&& cacheConfig.isPopulateCache()
|
||||
// historical only populates distributed cache since the cache lookups are done at broker.
|
||||
&& !(cache instanceof MapCache) ;
|
||||
Sequence<T> results = base.run(query);
|
||||
if (populateCache) {
|
||||
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
strategy.computeCacheKey(query)
|
||||
);
|
||||
CacheUtil.populate(
|
||||
cache,
|
||||
mapper,
|
||||
key,
|
||||
Sequences.toList(Sequences.map(results, strategy.prepareForCache()), new ArrayList())
|
||||
);
|
||||
}
|
||||
return results;
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class CacheUtil
|
||||
{
|
||||
public static Cache.NamedKey computeSegmentCacheKey(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor descriptor,
|
||||
byte[] queryCacheKey
|
||||
)
|
||||
{
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||
|
||||
return new Cache.NamedKey(
|
||||
segmentIdentifier, ByteBuffer
|
||||
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
||||
.putLong(segmentQueryInterval.getStartMillis())
|
||||
.putLong(segmentQueryInterval.getEndMillis())
|
||||
.put(versionBytes)
|
||||
.putInt(descriptor.getPartitionNumber())
|
||||
.put(queryCacheKey).array()
|
||||
);
|
||||
}
|
||||
|
||||
public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable<Object> results)
|
||||
{
|
||||
try {
|
||||
List<byte[]> bytes = Lists.newArrayList();
|
||||
int size = 0;
|
||||
for (Object result : results) {
|
||||
final byte[] array = mapper.writeValueAsBytes(result);
|
||||
size += array.length;
|
||||
bytes.add(array);
|
||||
}
|
||||
|
||||
byte[] valueBytes = new byte[size];
|
||||
int offset = 0;
|
||||
for (byte[] array : bytes) {
|
||||
System.arraycopy(array, 0, valueBytes, offset, array.length);
|
||||
offset += array.length;
|
||||
}
|
||||
|
||||
cache.put(key, valueBytes);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -40,6 +40,7 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
|
@ -61,8 +62,8 @@ import io.druid.timeline.partition.PartitionChunk;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -76,24 +77,26 @@ import java.util.concurrent.Executors;
|
|||
public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
|
||||
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final TimelineServerView serverView;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
@Inject
|
||||
public CachingClusteredClient(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
TimelineServerView serverView,
|
||||
Cache cache,
|
||||
@Smile ObjectMapper objectMapper
|
||||
@Smile ObjectMapper objectMapper,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.serverView = serverView;
|
||||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
|
||||
serverView.registerSegmentCallback(
|
||||
Executors.newFixedThreadPool(
|
||||
|
@ -122,18 +125,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
|
||||
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
|
||||
|
||||
final boolean useCache = Boolean.parseBoolean((String) query.getContextValue("useCache", "true")) && strategy != null;
|
||||
final boolean populateCache = Boolean.parseBoolean((String) query.getContextValue("populateCache", "true"))
|
||||
&& strategy != null;
|
||||
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
|
||||
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true"))
|
||||
&& strategy != null
|
||||
&& cacheConfig.isUseCache();
|
||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
||||
&& strategy != null && cacheConfig.isPopulateCache();
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||
|
||||
|
||||
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
final String priority = (String) query.getContextValue("priority", "0");
|
||||
final String priority = query.getContextValue("priority", "0");
|
||||
contextBuilder.put("priority", priority);
|
||||
|
||||
if (populateCache) {
|
||||
contextBuilder.put(CacheConfig.POPULATE_CACHE, "false");
|
||||
contextBuilder.put("bySegment", "true");
|
||||
}
|
||||
contextBuilder.put("intermediate", "true");
|
||||
|
@ -180,7 +186,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
if (queryCacheKey != null) {
|
||||
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
|
||||
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
|
||||
final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
|
||||
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
segment.lhs.getSegment().getIdentifier(),
|
||||
segment.rhs,
|
||||
queryCacheKey
|
||||
|
@ -286,7 +292,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
objectMapper.getFactory().createParser(cachedResult),
|
||||
cacheObjectClazz
|
||||
);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
@ -372,26 +379,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
);
|
||||
}
|
||||
|
||||
private Cache.NamedKey computeSegmentCacheKey(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor descriptor,
|
||||
byte[] queryCacheKey
|
||||
)
|
||||
{
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||
|
||||
return new Cache.NamedKey(
|
||||
segmentIdentifier, ByteBuffer
|
||||
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
||||
.putLong(segmentQueryInterval.getStartMillis())
|
||||
.putLong(segmentQueryInterval.getEndMillis())
|
||||
.put(versionBytes)
|
||||
.putInt(descriptor.getPartitionNumber())
|
||||
.put(queryCacheKey).array()
|
||||
);
|
||||
}
|
||||
|
||||
private static class CachePopulator
|
||||
{
|
||||
private final Cache cache;
|
||||
|
@ -407,26 +394,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
public void populate(Iterable<Object> results)
|
||||
{
|
||||
try {
|
||||
List<byte[]> bytes = Lists.newArrayList();
|
||||
int size = 0;
|
||||
for (Object result : results) {
|
||||
final byte[] array = mapper.writeValueAsBytes(result);
|
||||
size += array.length;
|
||||
bytes.add(array);
|
||||
}
|
||||
|
||||
byte[] valueBytes = new byte[size];
|
||||
int offset = 0;
|
||||
for (byte[] array : bytes) {
|
||||
System.arraycopy(array, 0, valueBytes, offset, array.length);
|
||||
offset += array.length;
|
||||
}
|
||||
|
||||
cache.put(key, valueBytes);
|
||||
} catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
CacheUtil.populate(cache, mapper, key, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class CacheConfig
|
||||
{
|
||||
public static String USE_CACHE = "useCache";
|
||||
public static String POPULATE_CACHE = "populateCache";
|
||||
@JsonProperty
|
||||
private boolean useCache = true;
|
||||
@JsonProperty
|
||||
private boolean populateCache = true;
|
||||
|
||||
public boolean isPopulateCache()
|
||||
{
|
||||
return populateCache;
|
||||
}
|
||||
|
||||
public boolean isUseCache()
|
||||
{
|
||||
return useCache;
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ public class LocalCacheProvider implements CacheProvider
|
|||
@Min(0)
|
||||
private int logEvictionCount = 0;
|
||||
|
||||
|
||||
@Override
|
||||
public Cache get()
|
||||
{
|
||||
|
|
|
@ -54,7 +54,6 @@ public class MapCache implements Cache
|
|||
)
|
||||
{
|
||||
this.byteCountingLRUMap = byteCountingLRUMap;
|
||||
|
||||
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
||||
|
||||
namespaceId = Maps.newHashMap();
|
||||
|
|
|
@ -83,9 +83,7 @@ public class MemcachedCache implements Cache
|
|||
.build(),
|
||||
AddrUtil.getAddresses(config.getHosts())
|
||||
),
|
||||
config.getMemcachedPrefix(),
|
||||
config.getTimeout(),
|
||||
config.getExpiration()
|
||||
config
|
||||
);
|
||||
} catch(IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -103,15 +101,16 @@ public class MemcachedCache implements Cache
|
|||
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||
private final AtomicLong errorCount = new AtomicLong(0);
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
|
||||
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) {
|
||||
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
||||
"memcachedPrefix length [%d] exceeds maximum length [%d]",
|
||||
memcachedPrefix.length(),
|
||||
config.getMemcachedPrefix().length(),
|
||||
MAX_PREFIX_LENGTH);
|
||||
this.timeout = timeout;
|
||||
this.expiration = expiration;
|
||||
this.timeout = config.getTimeout();
|
||||
this.expiration = config.getExpiration();
|
||||
this.client = client;
|
||||
this.memcachedPrefix = memcachedPrefix;
|
||||
this.memcachedPrefix = config.getMemcachedPrefix();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.server.coordination;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Ordering;
|
||||
|
@ -28,8 +29,12 @@ import com.metamx.common.guava.FunctionalIterable;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.CachePopulatingQueryRunner;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.collections.CountingMap;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.BySegmentQueryRunner;
|
||||
import io.druid.query.DataSource;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
|
@ -44,7 +49,6 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.ReferenceCountingSegmentQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import io.druid.query.spec.SpecificSegmentQueryRunner;
|
||||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
|
@ -70,24 +74,27 @@ import java.util.concurrent.ExecutorService;
|
|||
public class ServerManager implements QuerySegmentWalker
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final SegmentLoader segmentLoader;
|
||||
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||
private final ServiceEmitter emitter;
|
||||
private final ExecutorService exec;
|
||||
|
||||
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
|
||||
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
|
||||
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
|
||||
private final Cache cache;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
@Inject
|
||||
public ServerManager(
|
||||
SegmentLoader segmentLoader,
|
||||
QueryRunnerFactoryConglomerate conglomerate,
|
||||
ServiceEmitter emitter,
|
||||
@Processing ExecutorService exec
|
||||
@Processing ExecutorService exec,
|
||||
@Smile ObjectMapper objectMapper,
|
||||
Cache cache,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.segmentLoader = segmentLoader;
|
||||
|
@ -95,8 +102,11 @@ public class ServerManager implements QuerySegmentWalker
|
|||
this.emitter = emitter;
|
||||
|
||||
this.exec = exec;
|
||||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
|
||||
this.dataSources = new HashMap<>();
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
public Map<String, Long> getDataSourceSizes()
|
||||
|
@ -122,7 +132,9 @@ public class ServerManager implements QuerySegmentWalker
|
|||
* Load a single segment.
|
||||
*
|
||||
* @param segment segment to load
|
||||
*
|
||||
* @return true if the segment was newly loaded, false if it was already loaded
|
||||
*
|
||||
* @throws SegmentLoadingException if the segment cannot be loaded
|
||||
*/
|
||||
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
|
||||
|
@ -130,10 +142,12 @@ public class ServerManager implements QuerySegmentWalker
|
|||
final Segment adapter;
|
||||
try {
|
||||
adapter = segmentLoader.getSegment(segment);
|
||||
} catch (SegmentLoadingException e) {
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
try {
|
||||
segmentLoader.cleanup(segment);
|
||||
} catch (SegmentLoadingException e1) {
|
||||
}
|
||||
catch (SegmentLoadingException e1) {
|
||||
// ignore
|
||||
}
|
||||
throw e;
|
||||
|
@ -205,7 +219,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
try {
|
||||
log.info("Attempting to close segment %s", segment.getIdentifier());
|
||||
oldQueryable.close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Exception closing segment")
|
||||
.addData("dataSource", dataSource)
|
||||
.addData("segmentId", segment.getIdentifier())
|
||||
|
@ -241,7 +256,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
String dataSourceName;
|
||||
try {
|
||||
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
|
||||
} catch (ClassCastException e) {
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
|
||||
}
|
||||
|
||||
|
@ -287,13 +303,12 @@ public class ServerManager implements QuerySegmentWalker
|
|||
factory,
|
||||
toolChest,
|
||||
input.getObject(),
|
||||
new SpecificSegmentSpec(
|
||||
new SegmentDescriptor(
|
||||
holder.getInterval(),
|
||||
holder.getVersion(),
|
||||
input.getChunkNumber()
|
||||
)
|
||||
)
|
||||
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -326,7 +341,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
String dataSourceName;
|
||||
try {
|
||||
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
|
||||
} catch (ClassCastException e) {
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
|
||||
}
|
||||
|
||||
|
@ -360,7 +376,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
|
||||
final ReferenceCountingSegment adapter = chunk.getObject();
|
||||
return Arrays.asList(
|
||||
buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input))
|
||||
buildAndDecorateQueryRunner(factory, toolChest, adapter, input)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -376,9 +392,10 @@ public class ServerManager implements QuerySegmentWalker
|
|||
final QueryRunnerFactory<T, Query<T>> factory,
|
||||
final QueryToolChest<T, Query<T>> toolChest,
|
||||
final ReferenceCountingSegment adapter,
|
||||
final QuerySegmentSpec segmentSpec
|
||||
final SegmentDescriptor segmentDescriptor
|
||||
)
|
||||
{
|
||||
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
|
||||
return new SpecificSegmentQueryRunner<T>(
|
||||
new MetricsEmittingQueryRunner<T>(
|
||||
emitter,
|
||||
|
@ -393,7 +410,15 @@ public class ServerManager implements QuerySegmentWalker
|
|||
new BySegmentQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
adapter.getDataInterval().getStart(),
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
|
||||
new CachePopulatingQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolChest,
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
|
||||
cacheConfig
|
||||
)
|
||||
)
|
||||
).withWaitMeasuredFromNow(),
|
||||
segmentSpec
|
||||
|
|
|
@ -36,6 +36,7 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.guava.nary.TrinaryFn;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.RandomServerSelectorStrategy;
|
||||
|
@ -1193,7 +1194,8 @@ public class CachingClusteredClientTest
|
|||
}
|
||||
},
|
||||
cache,
|
||||
jsonMapper
|
||||
jsonMapper,
|
||||
new CacheConfig()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,19 +38,23 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
public class MemcachedCacheBenchmark extends SimpleBenchmark
|
||||
{
|
||||
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
||||
public static final String NAMESPACE = "default";
|
||||
|
||||
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
||||
private static byte[] randBytes;
|
||||
@Param({"localhost:11211"})
|
||||
String hosts;
|
||||
// object size in kB
|
||||
@Param({"1", "5", "10", "40"})
|
||||
int objectSize;
|
||||
@Param({"100", "1000"})
|
||||
int objectCount;
|
||||
private MemcachedCache cache;
|
||||
private MemcachedClientIF client;
|
||||
|
||||
private static byte[] randBytes;
|
||||
|
||||
@Param({"localhost:11211"}) String hosts;
|
||||
|
||||
// object size in kB
|
||||
@Param({"1", "5", "10", "40"}) int objectSize;
|
||||
@Param({"100", "1000"}) int objectCount;
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
Runner.main(MemcachedCacheBenchmark.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception
|
||||
|
@ -73,11 +77,29 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||
AddrUtil.getAddresses(hosts)
|
||||
);
|
||||
|
||||
|
||||
cache = new MemcachedCache(
|
||||
client,
|
||||
"druid-memcached-benchmark",
|
||||
30000, // 30 seconds
|
||||
3600 // 1 hour
|
||||
new MemcachedCacheConfig()
|
||||
{
|
||||
@Override
|
||||
public String getMemcachedPrefix()
|
||||
{
|
||||
return "druid-memcached-benchmark";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout()
|
||||
{
|
||||
return 30000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getExpiration()
|
||||
{
|
||||
return 3600;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
randBytes = new byte[objectSize * 1024];
|
||||
|
@ -90,9 +112,10 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||
client.shutdown(1, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
public void timePutObjects(int reps) {
|
||||
for(int i = 0; i < reps; ++i) {
|
||||
for(int k = 0; k < objectCount; ++k) {
|
||||
public void timePutObjects(int reps)
|
||||
{
|
||||
for (int i = 0; i < reps; ++i) {
|
||||
for (int k = 0; k < objectCount; ++k) {
|
||||
String key = BASE_KEY + k;
|
||||
cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes);
|
||||
}
|
||||
|
@ -101,11 +124,12 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||
}
|
||||
}
|
||||
|
||||
public long timeGetObject(int reps) {
|
||||
public long timeGetObject(int reps)
|
||||
{
|
||||
byte[] bytes = null;
|
||||
long count = 0;
|
||||
for (int i = 0; i < reps; i++) {
|
||||
for(int k = 0; k < objectCount; ++k) {
|
||||
for (int k = 0; k < objectCount; ++k) {
|
||||
String key = BASE_KEY + k;
|
||||
bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes()));
|
||||
count += bytes.length;
|
||||
|
@ -114,24 +138,21 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||
return count;
|
||||
}
|
||||
|
||||
public long timeBulkGetObjects(int reps) {
|
||||
public long timeBulkGetObjects(int reps)
|
||||
{
|
||||
long count = 0;
|
||||
for (int i = 0; i < reps; i++) {
|
||||
List<Cache.NamedKey> keys = Lists.newArrayList();
|
||||
for(int k = 0; k < objectCount; ++k) {
|
||||
for (int k = 0; k < objectCount; ++k) {
|
||||
String key = BASE_KEY + k;
|
||||
keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes()));
|
||||
}
|
||||
Map<Cache.NamedKey, byte[]> results = cache.getBulk(keys);
|
||||
for(Cache.NamedKey key : keys) {
|
||||
for (Cache.NamedKey key : keys) {
|
||||
byte[] bytes = results.get(key);
|
||||
count += bytes.length;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Runner.main(MemcachedCacheBenchmark.class, args);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,29 @@ public class MemcachedCacheTest
|
|||
public void setUp() throws Exception
|
||||
{
|
||||
MemcachedClientIF client = new MockMemcachedClient();
|
||||
cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600);
|
||||
cache = new MemcachedCache(
|
||||
client, new MemcachedCacheConfig()
|
||||
{
|
||||
@Override
|
||||
public String getMemcachedPrefix()
|
||||
{
|
||||
return "druid-memcached-test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout()
|
||||
{
|
||||
return 500;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getExpiration()
|
||||
{
|
||||
return 3600;
|
||||
}
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -438,7 +460,7 @@ class MockMemcachedClient implements MemcachedClientIF
|
|||
{
|
||||
Map<String, T> retVal = Maps.newHashMap();
|
||||
|
||||
while(keys.hasNext()) {
|
||||
while (keys.hasNext()) {
|
||||
String key = keys.next();
|
||||
CachedData data = theMap.get(key);
|
||||
retVal.put(key, data != null ? tc.decode(data) : null);
|
||||
|
|
|
@ -37,7 +37,10 @@ import com.metamx.common.guava.YieldingAccumulator;
|
|||
import com.metamx.common.guava.YieldingSequenceBase;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.LocalCacheProvider;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.ConcatQueryRunner;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.NoopQueryRunner;
|
||||
|
@ -136,7 +139,8 @@ public class ServerManagerTest
|
|||
}
|
||||
},
|
||||
new NoopServiceEmitter(),
|
||||
serverManagerExec
|
||||
serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get(),
|
||||
new CacheConfig()
|
||||
);
|
||||
|
||||
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
|
||||
|
@ -592,9 +596,7 @@ public class ServerManagerTest
|
|||
{
|
||||
private final String version;
|
||||
private final Interval interval;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
SegmentForTesting(
|
||||
|
|
|
@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.LocalCacheProvider;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.curator.CuratorTestBase;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
|
@ -52,12 +54,12 @@ import java.util.List;
|
|||
*/
|
||||
public class ZkCoordinatorTest extends CuratorTestBase
|
||||
{
|
||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private ZkCoordinator zkCoordinator;
|
||||
private ServerManager serverManager;
|
||||
private DataSegmentAnnouncer announcer;
|
||||
private File infoDir;
|
||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
|
@ -80,7 +82,10 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
new CacheTestSegmentLoader(),
|
||||
new NoopQueryRunnerFactoryConglomerate(),
|
||||
new NoopServiceEmitter(),
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new DefaultObjectMapper(),
|
||||
new LocalCacheProvider().get(),
|
||||
new CacheConfig()
|
||||
);
|
||||
|
||||
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
|
||||
|
@ -100,7 +105,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
|
||||
zkCoordinator = new ZkCoordinator(
|
||||
jsonMapper,
|
||||
new SegmentLoaderConfig(){
|
||||
new SegmentLoaderConfig()
|
||||
{
|
||||
@Override
|
||||
public File getInfoDir()
|
||||
{
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.74-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,6 +28,7 @@ import io.druid.client.BrokerServerView;
|
|||
import io.druid.client.CachingClusteredClient;
|
||||
import io.druid.client.TimelineServerView;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.CacheMonitor;
|
||||
import io.druid.client.cache.CacheProvider;
|
||||
import io.druid.client.selector.ServerSelectorStrategy;
|
||||
|
@ -81,7 +82,7 @@ public class CliBroker extends ServerRunnable
|
|||
|
||||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||
|
||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
|
|
@ -54,6 +54,10 @@ public class CliHadoopIndexer implements Runnable
|
|||
description = "The maven coordinates to the version of hadoop to run with. Defaults to org.apache.hadoop:hadoop-core:1.0.3")
|
||||
private String hadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
|
||||
|
||||
@Option(name = "hadoopDependencies",
|
||||
description = "The maven coordinates to the version of hadoop and all dependencies to run with. Defaults to using org.apache.hadoop:hadoop-core:1.0.3")
|
||||
private List<String> hadoopDependencyCoordinates = Arrays.<String>asList("org.apache.hadoop:hadoop-core:1.0.3");
|
||||
|
||||
@Inject
|
||||
private ExtensionsConfig extensionsConfig = null;
|
||||
|
||||
|
@ -63,9 +67,6 @@ public class CliHadoopIndexer implements Runnable
|
|||
{
|
||||
try {
|
||||
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopCoordinates
|
||||
);
|
||||
|
||||
final List<URL> extensionURLs = Lists.newArrayList();
|
||||
for (String coordinate : extensionsConfig.getCoordinates()) {
|
||||
|
@ -81,7 +82,12 @@ public class CliHadoopIndexer implements Runnable
|
|||
final List<URL> driverURLs = Lists.newArrayList();
|
||||
driverURLs.addAll(nonHadoopURLs);
|
||||
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
|
||||
for (String coordinate : hadoopDependencyCoordinates) {
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, coordinate
|
||||
);
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
}
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
|
|
|
@ -24,7 +24,11 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Module;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.airlift.command.Command;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.CacheProvider;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
|
@ -73,6 +77,10 @@ public class CliHistorical extends ServerRunnable
|
|||
|
||||
LifecycleModule.register(binder, ZkCoordinator.class);
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
||||
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class);
|
||||
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|