diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index ed13f9ddf39..71beaa26652 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -37,14 +37,14 @@ public abstract class BaseQuery implements Query { public static String QUERYID = "queryId"; private final DataSource dataSource; - private final Map context; + private final Map context; private final QuerySegmentSpec querySegmentSpec; private volatile Duration duration; public BaseQuery( DataSource dataSource, QuerySegmentSpec querySegmentSpec, - Map context + Map context ) { Preconditions.checkNotNull(dataSource, "dataSource can't be null"); @@ -102,28 +102,28 @@ public abstract class BaseQuery implements Query } @JsonProperty - public Map getContext() + public Map getContext() { return context; } @Override - public String getContextValue(String key) + public ContextType getContextValue(String key) { - return context == null ? null : context.get(key); + return context == null ? null : (ContextType) context.get(key); } @Override - public String getContextValue(String key, String defaultValue) + public ContextType getContextValue(String key, ContextType defaultValue) { - String retVal = getContextValue(key); + ContextType retVal = getContextValue(key); return retVal == null ? defaultValue : retVal; } - protected Map computeOverridenContext(Map overrides) + protected Map computeOverridenContext(Map overrides) { - Map overridden = Maps.newTreeMap(); - final Map context = getContext(); + Map overridden = Maps.newTreeMap(); + final Map context = getContext(); if (context != null) { overridden.putAll(context); } @@ -135,28 +135,41 @@ public abstract class BaseQuery implements Query @Override public String getId() { - return getContextValue(QUERYID); + return (String) getContextValue(QUERYID); } @Override public Query withId(String id) { - return withOverriddenContext(ImmutableMap.of(QUERYID, id)); + return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } BaseQuery baseQuery = (BaseQuery) o; - if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false; - if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false; - if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false; - if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null) + if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) { return false; + } + if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) { + return false; + } + if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) { + return false; + } + if (querySegmentSpec != null + ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) + : baseQuery.querySegmentSpec != null) { + return false; + } return true; } diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index 79c6a4e03a9..d6150f63456 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -53,7 +53,7 @@ public class BySegmentQueryRunner implements QueryRunner @SuppressWarnings("unchecked") public Sequence run(final Query query) { - if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { + if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { final Sequence baseSequence = base.run(query); return new Sequence() { diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 1711fb316f9..8e666c30b16 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner @Override public Sequence run(Query query) { - if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { + if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { return baseRunner.run(query); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 83d2ff48f98..d3600068a23 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { - final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); return new BaseSequence>( new BaseSequence.IteratorMaker>() diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index fd2abd13ab8..3ab6b0a8ff7 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -304,7 +304,7 @@ public class Druids private QueryGranularity granularity; private List aggregatorSpecs; private List postAggregatorSpecs; - private Map context; + private Map context; private TimeseriesQueryBuilder() { @@ -384,7 +384,7 @@ public class Druids return postAggregatorSpecs; } - public Map getContext() + public Map getContext() { return context; } @@ -465,7 +465,7 @@ public class Druids return this; } - public TimeseriesQueryBuilder context(Map c) + public TimeseriesQueryBuilder context(Map c) { context = c; return this; @@ -505,7 +505,7 @@ public class Druids private QuerySegmentSpec querySegmentSpec; private List dimensions; private SearchQuerySpec querySpec; - private Map context; + private Map context; public SearchQueryBuilder() { @@ -660,7 +660,7 @@ public class Druids return this; } - public SearchQueryBuilder context(Map c) + public SearchQueryBuilder context(Map c) { context = c; return this; @@ -690,7 +690,7 @@ public class Druids { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; - private Map context; + private Map context; public TimeBoundaryQueryBuilder() { @@ -746,7 +746,7 @@ public class Druids return this; } - public TimeBoundaryQueryBuilder context(Map c) + public TimeBoundaryQueryBuilder context(Map c) { context = c; return this; diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 77823a60730..2880332e184 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -48,7 +48,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { - final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment")); + final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment")); final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true")); if (shouldFinalize) { Function finalizerFn; @@ -100,7 +100,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } return Sequences.map( - baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))), + baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))), finalizerFn ); } diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index fb98968fb43..10dde9b26ea 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner query, configSupplier.get() ); - final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); if (Iterables.isEmpty(queryables)) { log.warn("No queryables found."); diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index d58798539ba..10a84328584 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -70,11 +70,11 @@ public interface Query public Duration getDuration(); - public String getContextValue(String key); + public ContextType getContextValue(String key); - public String getContextValue(String key, String defaultValue); + public ContextType getContextValue(String key, ContextType defaultValue); - public Query withOverriddenContext(Map contextOverride); + public Query withOverriddenContext(Map contextOverride); public Query withQuerySegmentSpec(QuerySegmentSpec spec); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 80322a29531..3f04f30b8aa 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -86,7 +86,7 @@ public class GroupByQuery extends BaseQuery @JsonProperty("having") HavingSpec havingSpec, @JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("orderBy") LimitSpec orderBySpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -147,7 +147,7 @@ public class GroupByQuery extends BaseQuery HavingSpec havingSpec, LimitSpec orderBySpec, Function, Sequence> orderByLimitFn, - Map context + Map context ) { super(dataSource, querySegmentSpec, context); @@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery } @Override - public GroupByQuery withOverriddenContext(Map contextOverride) + public GroupByQuery withOverriddenContext(Map contextOverride) { return new GroupByQuery( getDataSource(), @@ -268,7 +268,7 @@ public class GroupByQuery extends BaseQuery private List postAggregatorSpecs; private HavingSpec havingSpec; - private Map context; + private Map context; private LimitSpec limitSpec = null; private List orderByColumnSpecs = Lists.newArrayList(); @@ -443,7 +443,7 @@ public class GroupByQuery extends BaseQuery return this; } - public Builder setContext(Map context) + public Builder setContext(Map context) { this.context = context; return this; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 0f032f50121..1b77f2299ba 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -58,7 +58,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); + private static final Map NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); private final Supplier configSupplier; private GroupByQueryEngine engine; // For running the outer query around a subquery @@ -80,7 +80,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query input) { - if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { + if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); } else { return runner.run(input); diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 73c60b840a8..098a5462d3b 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -42,7 +42,7 @@ public class SegmentMetadataQuery extends BaseQuery @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("toInclude") ColumnIncluderator toInclude, @JsonProperty("merge") Boolean merge, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -77,7 +77,7 @@ public class SegmentMetadataQuery extends BaseQuery } @Override - public Query withOverriddenContext(Map contextOverride) + public Query withOverriddenContext(Map contextOverride) { return new SegmentMetadataQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 59fe269de51..6e14ef1c1f3 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> @JsonProperty("searchDimensions") List dimensions, @JsonProperty("query") SearchQuerySpec querySpec, @JsonProperty("sort") SearchSortSpec sortSpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -112,7 +112,7 @@ public class SearchQuery extends BaseQuery> } @Override - public SearchQuery withOverriddenContext(Map contextOverrides) + public SearchQuery withOverriddenContext(Map contextOverrides) { return new SearchQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index bcd29cb7f96..7556006734f 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -53,7 +53,7 @@ public class SelectQuery extends BaseQuery> @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, @JsonProperty("pagingSpec") PagingSpec pagingSpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -120,7 +120,7 @@ public class SelectQuery extends BaseQuery> ); } - public SelectQuery withOverriddenContext(Map contextOverrides) + public SelectQuery withOverriddenContext(Map contextOverrides) { return new SelectQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java index bad00f2bb87..357854f1958 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQuery.java @@ -54,7 +54,7 @@ public class TimeBoundaryQuery extends BaseQuery public TimeBoundaryQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super( @@ -78,7 +78,7 @@ public class TimeBoundaryQuery extends BaseQuery } @Override - public TimeBoundaryQuery withOverriddenContext(Map contextOverrides) + public TimeBoundaryQuery withOverriddenContext(Map contextOverrides) { return new TimeBoundaryQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index a1de320f5ec..3a03018a63e 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -55,7 +55,7 @@ public class TimeseriesQuery extends BaseQuery> @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("aggregations") List aggregatorSpecs, @JsonProperty("postAggregations") List postAggregatorSpecs, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -116,7 +116,7 @@ public class TimeseriesQuery extends BaseQuery> ); } - public TimeseriesQuery withOverriddenContext(Map contextOverrides) + public TimeseriesQuery withOverriddenContext(Map contextOverrides) { return new TimeseriesQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index dc59a663b2c..0e7a796d045 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -62,7 +62,7 @@ public class TopNQuery extends BaseQuery> @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("aggregations") List aggregatorSpecs, @JsonProperty("postAggregations") List postAggregatorSpecs, - @JsonProperty("context") Map context + @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); @@ -178,7 +178,7 @@ public class TopNQuery extends BaseQuery> ); } - public TopNQuery withOverriddenContext(Map contextOverrides) + public TopNQuery withOverriddenContext(Map contextOverrides) { return new TopNQuery( getDataSource(), diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java index 21efd3b8351..8f78ffe6191 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryBuilder.java @@ -69,7 +69,7 @@ public class TopNQueryBuilder private QueryGranularity granularity; private List aggregatorSpecs; private List postAggregatorSpecs; - private Map context; + private Map context; public TopNQueryBuilder() { @@ -130,7 +130,7 @@ public class TopNQueryBuilder return postAggregatorSpecs; } - public Map getContext() + public Map getContext() { return context; } @@ -290,7 +290,7 @@ public class TopNQueryBuilder return this; } - public TopNQueryBuilder context(Map c) + public TopNQueryBuilder context(Map c) { context = c; return this; diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 6f0400a1f1b..a7d77fde396 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest dummy.ps zip $@ $*.pdf $*.tex dummy.ps clean : diff --git a/publications/whitepaper/druid.pdf b/publications/whitepaper/druid.pdf index 1a1f0972952..baf917851fd 100644 Binary files a/publications/whitepaper/druid.pdf and b/publications/whitepaper/druid.pdf differ diff --git a/publications/whitepaper/druid.tex b/publications/whitepaper/druid.tex index 3340e83a0ac..aa3bed9579d 100644 --- a/publications/whitepaper/druid.tex +++ b/publications/whitepaper/druid.tex @@ -1,6 +1,17 @@ \documentclass{sig-alternate-2013} -\input{sig-license.tex} +\newfont{\mycrnotice}{ptmr8t at 7pt} +\newfont{\myconfname}{ptmri8t at 7pt} +\let\crnotice\mycrnotice% +\let\confname\myconfname% +\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.} +\conferenceinfo{SIGMOD/PODS'14,}{June 22--27, 2014, Salt Lake City, UT, USA. \\ +{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}} +\copyrightetc{ACM \the\acmcopyr} +\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\ +\href{http://dx.doi.org/10.1145/2588555.2595631}{http://dx.doi.org/10.1145/2588555.2595631}} +\clubpenalty=10000 +\widowpenalty = 10000 \usepackage{graphicx} \usepackage{balance} @@ -50,8 +61,7 @@ \maketitle \begin{abstract} -Druid is an open -source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}} +Druid is an open source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}} data store designed for real-time exploratory analytics on large data sets. The system combines a column-oriented storage layout, a distributed, shared-nothing architecture, and an advanced indexing structure to allow for @@ -120,7 +130,6 @@ service, and attempts to help inform anyone who faces a similar problem about a potential method of solving it. Druid is deployed in production at several technology companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}. - The structure of the paper is as follows: we first describe the problem in Section \ref{sec:problem-definition}. Next, we detail system architecture from the point of view of how data flows through the system in Section @@ -134,6 +143,21 @@ in Section \ref{sec:related}. \section{Problem Definition} \label{sec:problem-definition} +\begin{table*} + \centering + \begin{tabular}{| l | l | l | l | l | l | l | l |} + \hline + \textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline + 2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline + 2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline + 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline + 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline + \end{tabular} + \caption{Sample Druid data for edits that have occurred on Wikipedia.} + \label{tab:sample_data} +\end{table*} + + Druid was originally designed to solve problems around ingesting and exploring large quantities of transactional events (log data). This form of timeseries data is commonly found in OLAP workflows and the nature of the data tends to be @@ -149,20 +173,6 @@ there are a set of metric columns that contain values (usually numeric) that can be aggregated, such as the number of characters added or removed in an edit. -\begin{table*} - \centering - \begin{tabular}{| l | l | l | l | l | l | l | l |} - \hline - \textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline - 2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline - 2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline - 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline - 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline - \end{tabular} - \caption{Sample Druid data for edits that have occurred on Wikipedia.} - \label{tab:sample_data} -\end{table*} - Our goal is to rapidly compute drill-downs and aggregates over this data. We want to answer questions like “How many edits were made on the page Justin Bieber from males in San Francisco?” and “What is the average number of @@ -207,12 +217,13 @@ designed to perform a specific set of things. We believe this design separates concerns and simplifies the complexity of the system. The different node types operate fairly independent of each other and there is minimal interaction among them. Hence, intra-cluster communication failures have minimal impact -on data availability. To solve complex data analysis problems, the different -node types come together to form a fully working system. The name Druid comes -from the Druid class in many role-playing games: it is a shape-shifter, capable -of taking on many different forms to fulfill various different roles in a -group. The composition of and flow of data in a Druid cluster are shown in -Figure~\ref{fig:cluster}. +on data availability. + +To solve complex data analysis problems, the different +node types come together to form a fully working system. The composition of and +flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. The name Druid comes from the Druid class in many role-playing games: it is a +shape-shifter, capable of taking on many different forms to fulfill various +different roles in a group. \begin{figure*} \centering @@ -221,7 +232,6 @@ Figure~\ref{fig:cluster}. \label{fig:cluster} \end{figure*} -\newpage \subsection{Real-time Nodes} \label{sec:realtime} Real-time nodes encapsulate the functionality to ingest and query event @@ -249,10 +259,11 @@ in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}. \begin{figure} \centering \includegraphics[width = 2.6in]{realtime_flow} -\caption{Real-time nodes first buffer events in memory. On a periodic basis, -the in-memory index is persisted to disk. On another periodic basis, all -persisted indexes are merged together and handed off. Queries will hit the -in-memory index and the persisted indexes.} +\caption{Real-time nodes buffer events to an in-memory index, which is +regularly persisted to disk. On a periodic basis, persisted indexes are then merged +together before getting handed off. +Queries will hit both the in-memory and persisted indexes. +} \label{fig:realtime_flow} \end{figure} @@ -417,9 +428,7 @@ caching the results would be unreliable. \begin{figure*} \centering \includegraphics[width = 4.5in]{caching} -\caption{Broker nodes cache per segment results. Every Druid query is mapped to -a set of segments. Queries often combine cached segment results with those that -need to be computed on historical and real-time nodes.} +\caption{Results are cached per segment. Queries combine cached results with results computed on historical and real-time nodes.} \label{fig:caching} \end{figure*} @@ -791,7 +800,7 @@ involving all columns are very rare. \begin{table} \centering - \begin{tabular}{| l | l | l |} + \scriptsize\begin{tabular}{| l | l | l |} \hline \textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline \texttt{a} & 25 & 21 \\ \hline @@ -803,6 +812,7 @@ involving all columns are very rare. \texttt{g} & 26 & 18 \\ \hline \texttt{h} & 78 & 14 \\ \hline \end{tabular} + \normalsize \caption{Characteristics of production data sources.} \label{tab:datasources} \end{table} @@ -900,6 +910,7 @@ well. \begin{figure} \centering \includegraphics[width = 2.3in]{tpch_scaling} +\includegraphics[width = 2.3in]{tpch_scaling_factor} \caption{Druid scaling benchmarks -- 100GB TPC-H data.} \label{fig:tpch_scaling} \end{figure} diff --git a/publications/whitepaper/figures/caching.pdf b/publications/whitepaper/figures/caching.pdf new file mode 100644 index 00000000000..e42b317148d Binary files /dev/null and b/publications/whitepaper/figures/caching.pdf differ diff --git a/publications/whitepaper/figures/caching.png b/publications/whitepaper/figures/caching.png deleted file mode 100644 index e3ee4dd94df..00000000000 Binary files a/publications/whitepaper/figures/caching.png and /dev/null differ diff --git a/publications/whitepaper/figures/cluster.pdf b/publications/whitepaper/figures/cluster.pdf new file mode 100644 index 00000000000..649be901107 Binary files /dev/null and b/publications/whitepaper/figures/cluster.pdf differ diff --git a/publications/whitepaper/figures/cluster.png b/publications/whitepaper/figures/cluster.png deleted file mode 100644 index 1a07213e2cd..00000000000 Binary files a/publications/whitepaper/figures/cluster.png and /dev/null differ diff --git a/publications/whitepaper/figures/historical_download.pdf b/publications/whitepaper/figures/historical_download.pdf new file mode 100644 index 00000000000..20c2c733831 Binary files /dev/null and b/publications/whitepaper/figures/historical_download.pdf differ diff --git a/publications/whitepaper/figures/historical_download.png b/publications/whitepaper/figures/historical_download.png deleted file mode 100644 index a9db3fd3609..00000000000 Binary files a/publications/whitepaper/figures/historical_download.png and /dev/null differ diff --git a/publications/whitepaper/figures/realtime_flow.pdf b/publications/whitepaper/figures/realtime_flow.pdf new file mode 100644 index 00000000000..182e64d2c4c Binary files /dev/null and b/publications/whitepaper/figures/realtime_flow.pdf differ diff --git a/publications/whitepaper/figures/realtime_flow.png b/publications/whitepaper/figures/realtime_flow.png deleted file mode 100644 index 4f50a5c54b5..00000000000 Binary files a/publications/whitepaper/figures/realtime_flow.png and /dev/null differ diff --git a/publications/whitepaper/figures/realtime_pipeline.pdf b/publications/whitepaper/figures/realtime_pipeline.pdf new file mode 100644 index 00000000000..581671d58e2 Binary files /dev/null and b/publications/whitepaper/figures/realtime_pipeline.pdf differ diff --git a/publications/whitepaper/figures/realtime_pipeline.png b/publications/whitepaper/figures/realtime_pipeline.png deleted file mode 100644 index f338d239117..00000000000 Binary files a/publications/whitepaper/figures/realtime_pipeline.png and /dev/null differ diff --git a/publications/whitepaper/figures/realtime_timeline.pdf b/publications/whitepaper/figures/realtime_timeline.pdf new file mode 100644 index 00000000000..339502b66cd Binary files /dev/null and b/publications/whitepaper/figures/realtime_timeline.pdf differ diff --git a/publications/whitepaper/figures/realtime_timeline.png b/publications/whitepaper/figures/realtime_timeline.png deleted file mode 100644 index 76806bc9aa2..00000000000 Binary files a/publications/whitepaper/figures/realtime_timeline.png and /dev/null differ diff --git a/publications/whitepaper/figures/tpch_scaling.pdf b/publications/whitepaper/figures/tpch_scaling.pdf new file mode 100644 index 00000000000..428e2d60c0f Binary files /dev/null and b/publications/whitepaper/figures/tpch_scaling.pdf differ diff --git a/publications/whitepaper/figures/tpch_scaling.png b/publications/whitepaper/figures/tpch_scaling.png deleted file mode 100644 index e929da0c5dd..00000000000 Binary files a/publications/whitepaper/figures/tpch_scaling.png and /dev/null differ diff --git a/publications/whitepaper/figures/tpch_scaling_factor.pdf b/publications/whitepaper/figures/tpch_scaling_factor.pdf new file mode 100644 index 00000000000..7317fb4fac7 Binary files /dev/null and b/publications/whitepaper/figures/tpch_scaling_factor.pdf differ diff --git a/publications/whitepaper/sgmd0658-yang.pdf b/publications/whitepaper/sgmd0658-yang.pdf new file mode 100644 index 00000000000..e3c654ebf72 Binary files /dev/null and b/publications/whitepaper/sgmd0658-yang.pdf differ diff --git a/publications/whitepaper/sgmd0658-yang.zip b/publications/whitepaper/sgmd0658-yang.zip new file mode 100644 index 00000000000..83e16e2e850 Binary files /dev/null and b/publications/whitepaper/sgmd0658-yang.zip differ diff --git a/publications/whitepaper/sig-license.tex b/publications/whitepaper/sig-license.tex deleted file mode 100644 index 4c639061afd..00000000000 --- a/publications/whitepaper/sig-license.tex +++ /dev/null @@ -1,12 +0,0 @@ -\newfont{\mycrnotice}{ptmr8t at 7pt} -\newfont{\myconfname}{ptmri8t at 7pt} -\let\crnotice\mycrnotice% -\let\confname\myconfname% -\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.} -\conferenceinfo{SIGMOD'14,}{June 22--27, 2014, Snowbird, UT, USA. \\ -{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}} -\copyrightetc{ACM \the\acmcopyr} -\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\ -Include the http://DOI string/url which is specific for your submission and included in the ACM rightsreview confirmation email upon completing your ACM form} -\clubpenalty=10000 -\widowpenalty = 10000 diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c5bbbedd774..82c6e51d498 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -132,7 +132,7 @@ public class CachingClusteredClient implements QueryRunner final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); - ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); final String priority = query.getContextValue("priority", "0"); contextBuilder.put("priority", priority); diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManager.java b/server/src/main/java/io/druid/db/DatabaseRuleManager.java index 9a1c014b36f..5882c964346 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManager.java @@ -159,7 +159,7 @@ public class DatabaseRuleManager this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); - createDefaultRule(dbi, getRulesTable(), config.get().getDefaultTier(), jsonMapper); + createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper); ScheduledExecutors.scheduleWithFixedDelay( exec, new Duration(0), @@ -274,8 +274,8 @@ public class DatabaseRuleManager if (theRules.get(dataSource) != null) { retVal.addAll(theRules.get(dataSource)); } - if (theRules.get(config.get().getDefaultTier()) != null) { - retVal.addAll(theRules.get(config.get().getDefaultTier())); + if (theRules.get(config.get().getDefaultRule()) != null) { + retVal.addAll(theRules.get(config.get().getDefaultRule())); } return retVal; } diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java b/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java index 8b770a57955..63f506649b5 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java @@ -27,14 +27,14 @@ import org.joda.time.Period; public class DatabaseRuleManagerConfig { @JsonProperty - private String defaultTier = "_default"; + private String defaultRule = "_default"; @JsonProperty private Period pollDuration = new Period("PT1M"); - public String getDefaultTier() + public String getDefaultRule() { - return defaultTier; + return defaultRule; } public Period getPollDuration() diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java b/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java index dfc25bea283..0296f25867e 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java @@ -64,7 +64,7 @@ public class DatabaseRuleManagerProvider implements Provider + * * @return Returns the set of modules loaded. */ - public static Set getLoadedModules(Class clazz) + public static Set getLoadedModules(Class clazz) { Set retVal = extensionsMap.get(clazz); if (retVal == null) { @@ -190,22 +191,29 @@ public class Initialization ) ); - final List artifacts = aether.resolveArtifacts(dependencyRequest); - List urls = Lists.newArrayListWithExpectedSize(artifacts.size()); - for (Artifact artifact : artifacts) { - if (!exclusions.contains(artifact.getGroupId())) { - urls.add(artifact.getFile().toURI().toURL()); - } else { - log.debug("Skipped Artifact[%s]", artifact); + try { + final List artifacts = aether.resolveArtifacts(dependencyRequest); + + List urls = Lists.newArrayListWithExpectedSize(artifacts.size()); + for (Artifact artifact : artifacts) { + if (!exclusions.contains(artifact.getGroupId())) { + urls.add(artifact.getFile().toURI().toURL()); + } else { + log.debug("Skipped Artifact[%s]", artifact); + } } - } - for (URL url : urls) { - log.info("Added URL[%s]", url); - } + for (URL url : urls) { + log.info("Added URL[%s]", url); + } - loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()); - loadersMap.put(coordinate, loader); + loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader()); + loadersMap.put(coordinate, loader); + } + catch (Exception e) { + log.error(e, "Unable to resolve artifacts for [%s].", dependencyRequest); + throw Throwables.propagate(e); + } } return loader; } @@ -232,9 +240,9 @@ public class Initialization URI u = new URI(uri); Repository r = new Repository(uri); - if(u.getUserInfo() != null) { + if (u.getUserInfo() != null) { String[] auth = u.getUserInfo().split(":", 2); - if(auth.length == 2) { + if (auth.length == 2) { r.setUsername(auth[0]); r.setPassword(auth[1]); } else { @@ -247,7 +255,7 @@ public class Initialization } remoteRepositories.add(r); } - catch(URISyntaxException e) { + catch (URISyntaxException e) { throw Throwables.propagate(e); } } @@ -261,28 +269,30 @@ public class Initialization PrintStream oldOut = System.out; try { - System.setOut(new PrintStream( - new OutputStream() - { - @Override - public void write(int b) throws IOException - { + System.setOut( + new PrintStream( + new OutputStream() + { + @Override + public void write(int b) throws IOException + { - } + } - @Override - public void write(byte[] b) throws IOException - { + @Override + public void write(byte[] b) throws IOException + { - } + } - @Override - public void write(byte[] b, int off, int len) throws IOException - { + @Override + public void write(byte[] b, int off, int len) throws IOException + { - } - } - )); + } + } + ) + ); return new DefaultTeslaAether( config.getLocalRepository(), remoteRepositories.toArray(new Repository[remoteRepositories.size()]) diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java index 510cb29c55a..a96c7f2cf80 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverDropRule.java @@ -22,6 +22,7 @@ package io.druid.server.coordinator.rules; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import org.joda.time.Interval; /** */ @@ -39,4 +40,10 @@ public class ForeverDropRule extends DropRule { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java index 2150150cb03..2909741906e 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/ForeverLoadRule.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.util.Map; @@ -66,4 +67,10 @@ public class ForeverLoadRule extends LoadRule { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalDropRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalDropRule.java index 8ea0bd30574..30e75b7d31c 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/IntervalDropRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalDropRule.java @@ -55,6 +55,12 @@ public class IntervalDropRule extends DropRule @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { - return interval.contains(segment.getInterval()); + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp) + { + return interval.contains(theInterval); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java index 9b2f67c652d..3164da690be 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/IntervalLoadRule.java @@ -85,7 +85,13 @@ public class IntervalLoadRule extends LoadRule @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) { - return interval.contains(segment.getInterval()); + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp) + { + return interval.contains(theInterval); } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodDropRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodDropRule.java index 1e7b1208628..c45345804d0 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/PeriodDropRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodDropRule.java @@ -55,8 +55,14 @@ public class PeriodDropRule extends DropRule @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp) { final Interval currInterval = new Interval(period, referenceTimestamp); - return currInterval.contains(segment.getInterval()); + return currInterval.contains(theInterval); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java index bfad025b4a5..e996fa9e464 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/PeriodLoadRule.java @@ -86,8 +86,14 @@ public class PeriodLoadRule extends LoadRule @Override public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return appliesTo(segment.getInterval(), referenceTimestamp); + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) { final Interval currInterval = new Interval(period, referenceTimestamp); - return currInterval.overlaps(segment.getInterval()); + return currInterval.overlaps(interval); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java index d3d809232dc..c3da9eccfde 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/Rule.java @@ -26,6 +26,7 @@ import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; +import org.joda.time.Interval; /** */ @@ -37,13 +38,15 @@ import org.joda.time.DateTime; @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class), @JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class) - }) + public interface Rule { public String getType(); public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp); + public boolean appliesTo(Interval interval, DateTime referenceTimestamp); + public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment); } diff --git a/server/src/main/java/io/druid/server/router/BrokerSelector.java b/server/src/main/java/io/druid/server/router/BrokerSelector.java new file mode 100644 index 00000000000..e27acf09c10 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/BrokerSelector.java @@ -0,0 +1,178 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.common.Pair; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; +import io.druid.server.coordinator.rules.LoadRule; +import io.druid.server.coordinator.rules.Rule; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class BrokerSelector +{ + private static EmittingLogger log = new EmittingLogger(BrokerSelector.class); + + private final CoordinatorRuleManager ruleManager; + private final TierConfig tierConfig; + private final ServerDiscoveryFactory serverDiscoveryFactory; + private final ConcurrentHashMap selectorMap = new ConcurrentHashMap(); + + private final Object lock = new Object(); + + private volatile boolean started = false; + + @Inject + public BrokerSelector( + CoordinatorRuleManager ruleManager, + TierConfig tierConfig, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + this.ruleManager = ruleManager; + this.tierConfig = tierConfig; + this.serverDiscoveryFactory = serverDiscoveryFactory; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + try { + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); + selector.start(); + selectorMap.put(entry.getValue(), selector); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + started = true; + } + } + + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + try { + for (ServerDiscoverySelector selector : selectorMap.values()) { + selector.stop(); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + started = false; + } + } + + public Pair select(final Query query) + { + synchronized (lock) { + if (!ruleManager.isStarted() || !started) { + return null; + } + } + + List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + + // find the rule that can apply to the entire set of intervals + DateTime now = new DateTime(); + int lastRulePosition = -1; + LoadRule baseRule = null; + + for (Interval interval : query.getIntervals()) { + int currRulePosition = 0; + for (Rule rule : rules) { + if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) { + lastRulePosition = currRulePosition; + baseRule = (LoadRule) rule; + break; + } + currRulePosition++; + } + } + + if (baseRule == null) { + return null; + } + + // in the baseRule, find the broker of highest priority + String brokerServiceName = null; + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + if (baseRule.getTieredReplicants().containsKey(entry.getKey())) { + brokerServiceName = entry.getValue(); + break; + } + } + + if (brokerServiceName == null) { + log.makeAlert( + "WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].", + query.getDataSource(), + query.getIntervals(), + tierConfig.getDefaultBrokerServiceName() + ).emit(); + brokerServiceName = tierConfig.getDefaultBrokerServiceName(); + } + + ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); + + if (retVal == null) { + log.makeAlert( + "WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", + brokerServiceName, + tierConfig.getDefaultBrokerServiceName() + ).emit(); + retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); + } + + return new Pair<>(brokerServiceName, retVal); + } +} diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java new file mode 100644 index 00000000000..9acf5f94858 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -0,0 +1,193 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Charsets; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.client.selector.Server; +import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.server.coordinator.rules.Rule; +import org.joda.time.Duration; + +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +@ManageLifecycle +public class CoordinatorRuleManager +{ + private static final Logger log = new Logger(CoordinatorRuleManager.class); + + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + private final Supplier config; + private final ServerDiscoverySelector selector; + + private final StatusResponseHandler responseHandler; + private final AtomicReference>> rules; + + private volatile ScheduledExecutorService exec; + + private final Object lock = new Object(); + + private volatile boolean started = false; + + @Inject + public CoordinatorRuleManager( + @Global HttpClient httpClient, + @Json ObjectMapper jsonMapper, + Supplier config, + ServerDiscoverySelector selector + ) + { + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + this.config = config; + this.selector = selector; + + this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); + this.rules = new AtomicReference<>( + new ConcurrentHashMap>() + ); + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + this.exec = Execs.scheduledSingleThreaded("CoordinatorRuleManager-Exec--%d"); + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.get().getPollPeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + rules.set(new ConcurrentHashMap>()); + + started = false; + exec.shutdownNow(); + exec = null; + } + } + + public boolean isStarted() + { + return started; + } + + public void poll() + { + try { + String url = getRuleURL(); + if (url == null) { + return; + } + + StatusResponseHolder response = httpClient.get(new URL(url)) + .go(responseHandler) + .get(); + + ConcurrentHashMap> newRules = new ConcurrentHashMap>( + (Map>) jsonMapper.readValue( + response.getContent(), new TypeReference>>() + { + } + ) + ); + + log.info("Got [%,d] rules", newRules.keySet().size()); + + rules.set(newRules); + } + catch (Exception e) { + log.error(e, "Exception while polling for rules"); + } + } + + public List getRulesWithDefault(final String dataSource) + { + List retVal = Lists.newArrayList(); + Map> theRules = rules.get(); + if (theRules.get(dataSource) != null) { + retVal.addAll(theRules.get(dataSource)); + } + if (theRules.get(config.get().getDefaultRule()) != null) { + retVal.addAll(theRules.get(config.get().getDefaultRule())); + } + return retVal; + } + + private String getRuleURL() + { + Server server = selector.pick(); + + if (server == null) { + log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName()); + return null; + } + + return String.format("http://%s%s", server.getHost(), config.get().getRulesEndpoint()); + } +} diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java new file mode 100644 index 00000000000..92bf43135fe --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java @@ -0,0 +1,82 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.metamx.http.client.HttpClient; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.guice.annotations.Global; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.SegmentDescriptor; +import org.joda.time.Interval; + +/** + */ +public class RouterQuerySegmentWalker implements QuerySegmentWalker +{ + private final QueryToolChestWarehouse warehouse; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + private final BrokerSelector brokerSelector; + private final TierConfig tierConfig; + + @Inject + public RouterQuerySegmentWalker( + QueryToolChestWarehouse warehouse, + ObjectMapper objectMapper, + @Global HttpClient httpClient, + BrokerSelector brokerSelector, + TierConfig tierConfig + ) + { + this.warehouse = warehouse; + this.objectMapper = objectMapper; + this.httpClient = httpClient; + this.brokerSelector = brokerSelector; + this.tierConfig = tierConfig; + } + + @Override + public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) + { + return makeRunner(); + } + + @Override + public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) + { + return makeRunner(); + } + + private QueryRunner makeRunner() + { + return new TierAwareQueryRunner( + warehouse, + objectMapper, + httpClient, + brokerSelector, + tierConfig + ); + } +} diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java new file mode 100644 index 00000000000..3a098e0faed --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java @@ -0,0 +1,121 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.Pair; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.HttpClient; +import io.druid.client.DirectDruidClient; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChestWarehouse; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class TierAwareQueryRunner implements QueryRunner +{ + private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class); + + private final QueryToolChestWarehouse warehouse; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + private final BrokerSelector brokerSelector; + private final TierConfig tierConfig; + + private final ConcurrentHashMap serverBackup = new ConcurrentHashMap(); + + public TierAwareQueryRunner( + QueryToolChestWarehouse warehouse, + ObjectMapper objectMapper, + HttpClient httpClient, + BrokerSelector brokerSelector, + TierConfig tierConfig + ) + { + this.warehouse = warehouse; + this.objectMapper = objectMapper; + this.httpClient = httpClient; + this.brokerSelector = brokerSelector; + this.tierConfig = tierConfig; + } + + public Server findServer(Query query) + { + final Pair selected = brokerSelector.select(query); + final String brokerServiceName = selected.lhs; + final ServerDiscoverySelector selector = selected.rhs; + + Server server = selector.pick(); + if (server == null) { + log.error( + "WTF?! No server found for brokerServiceName[%s]. Using backup", + brokerServiceName + ); + + server = serverBackup.get(brokerServiceName); + + if (server == null) { + log.makeAlert( + "WTF?! No backup found for brokerServiceName[%s]. Using default[%s]", + brokerServiceName, + tierConfig.getDefaultBrokerServiceName() + ).emit(); + + server = serverBackup.get(tierConfig.getDefaultBrokerServiceName()); + } + } else { + serverBackup.put(brokerServiceName, server); + } + + return server; + } + + @Override + public Sequence run(Query query) + { + Server server = findServer(query); + + if (server == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + return Sequences.empty(); + } + + QueryRunner client = new DirectDruidClient( + warehouse, + objectMapper, + httpClient, + server.getHost() + ); + + return client.run(query); + } +} diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TierConfig.java new file mode 100644 index 00000000000..c819edfce23 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TierConfig.java @@ -0,0 +1,91 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import io.druid.client.DruidServer; +import org.joda.time.Period; + +import javax.validation.constraints.NotNull; +import java.util.LinkedHashMap; + +/** + */ +public class TierConfig +{ + @JsonProperty + @NotNull + private String defaultBrokerServiceName = ""; + + @JsonProperty + private LinkedHashMap tierToBrokerMap; + + @JsonProperty + @NotNull + private String defaultRule = "_default"; + + @JsonProperty + @NotNull + private String rulesEndpoint = "/druid/coordinator/v1/rules"; + + @JsonProperty + @NotNull + private String coordinatorServiceName = null; + + @JsonProperty + @NotNull + private Period pollPeriod = new Period("PT1M"); + + // tier, + public LinkedHashMap getTierToBrokerMap() + { + return tierToBrokerMap == null ? new LinkedHashMap<>( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, defaultBrokerServiceName + ) + ) : tierToBrokerMap; + } + + public String getDefaultBrokerServiceName() + { + return defaultBrokerServiceName; + } + + public String getDefaultRule() + { + return defaultRule; + } + + public String getRulesEndpoint() + { + return rulesEndpoint; + } + + public String getCoordinatorServiceName() + { + return coordinatorServiceName; + } + + public Period getPollPeriod() + { + return pollPeriod; + } +} diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index c3aa46a3226..017baf9bae8 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -122,7 +122,7 @@ public class CachingClusteredClientTest */ private static final int RANDOMNESS = 10; - public static final ImmutableMap CONTEXT = ImmutableMap.of(); + public static final ImmutableMap CONTEXT = ImmutableMap.of(); public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.of()); public static final String DATA_SOURCE = "test"; @@ -326,7 +326,7 @@ public class CachingClusteredClientTest testQueryCaching( 1, true, - builder.context(ImmutableMap.of("useCache", "false", + builder.context(ImmutableMap.of("useCache", "false", "populateCache", "true")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -340,7 +340,7 @@ public class CachingClusteredClientTest testQueryCaching( 1, false, - builder.context(ImmutableMap.of("useCache", "false", + builder.context(ImmutableMap.of("useCache", "false", "populateCache", "false")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -352,7 +352,7 @@ public class CachingClusteredClientTest testQueryCaching( 1, false, - builder.context(ImmutableMap.of("useCache", "true", + builder.context(ImmutableMap.of("useCache", "true", "populateCache", "false")).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 3e75d1b79c3..24542e70df2 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -122,6 +122,12 @@ public class LoadRuleTest { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } }; DruidCluster druidCluster = new DruidCluster( @@ -214,6 +220,12 @@ public class LoadRuleTest { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } }; DruidServer server1 = new DruidServer( diff --git a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java b/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java new file mode 100644 index 00000000000..55e5dda44ea --- /dev/null +++ b/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java @@ -0,0 +1,230 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Pair; +import com.metamx.http.client.HttpClient; +import io.druid.client.DruidServer; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.query.Druids; +import io.druid.query.TableDataSource; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.timeboundary.TimeBoundaryQuery; +import io.druid.server.coordinator.rules.IntervalLoadRule; +import io.druid.server.coordinator.rules.Rule; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +/** + */ +public class BrokerSelectorTest +{ + private ServerDiscoveryFactory factory; + private ServerDiscoverySelector selector; + private BrokerSelector brokerSelector; + + @Before + public void setUp() throws Exception + { + factory = EasyMock.createMock(ServerDiscoveryFactory.class); + selector = EasyMock.createMock(ServerDiscoverySelector.class); + + brokerSelector = new BrokerSelector( + new TestRuleManager(null, null, null, null), + new TierConfig() + { + @Override + public LinkedHashMap getTierToBrokerMap() + { + return new LinkedHashMap( + ImmutableMap.of( + "hot", "hotBroker", + "medium", "mediumBroker", + DruidServer.DEFAULT_TIER, "coldBroker" + ) + ); + } + + @Override + public String getDefaultBrokerServiceName() + { + return "hotBroker"; + } + }, + factory + ); + EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); + EasyMock.replay(factory); + + selector.start(); + EasyMock.expectLastCall().atLeastOnce(); + selector.stop(); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(selector); + + brokerSelector.start(); + + } + + @After + public void tearDown() throws Exception + { + brokerSelector.stop(); + + EasyMock.verify(selector); + EasyMock.verify(factory); + } + + @Test + public void testBasicSelect() throws Exception + { + String brokerName = (String) brokerSelector.select( + new TimeBoundaryQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2011-08-31/2011-09-01"))), + null + ) + ).lhs; + + Assert.assertEquals("coldBroker", brokerName); + } + + + @Test + public void testBasicSelect2() throws Exception + { + String brokerName = (String) brokerSelector.select( + new TimeBoundaryQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2013-08-31/2013-09-01"))), + null + ) + ).lhs; + + Assert.assertEquals("hotBroker", brokerName); + } + + @Test + public void testSelectMatchesNothing() throws Exception + { + Pair retVal = brokerSelector.select( + new TimeBoundaryQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2010-08-31/2010-09-01"))), + null + ) + ); + + Assert.assertEquals(null, retVal); + } + + + @Test + public void testSelectMultiInterval() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2013-08-31/2013-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2011-08-31/2011-09-01") + ) + ) + ).build() + ).lhs; + + Assert.assertEquals("coldBroker", brokerName); + } + + @Test + public void testSelectMultiInterval2() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ).build() + ).lhs; + + Assert.assertEquals("coldBroker", brokerName); + } + + private static class TestRuleManager extends CoordinatorRuleManager + { + public TestRuleManager( + @Global HttpClient httpClient, + @Json ObjectMapper jsonMapper, + Supplier config, + ServerDiscoverySelector selector + ) + { + super(httpClient, jsonMapper, config, selector); + } + + @Override + public boolean isStarted() + { + return true; + } + + @Override + public List getRulesWithDefault(String dataSource) + { + return Arrays.asList( + new IntervalLoadRule(new Interval("2013/2014"), ImmutableMap.of("hot", 1), null, null), + new IntervalLoadRule(new Interval("2012/2013"), ImmutableMap.of("medium", 1), null, null), + new IntervalLoadRule( + new Interval("2011/2012"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, 1), + null, + null + ) + ); + } + } +} diff --git a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java b/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java new file mode 100644 index 00000000000..d788daa2f2b --- /dev/null +++ b/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java @@ -0,0 +1,139 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Pair; +import io.druid.client.DruidServer; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; +import io.druid.query.TableDataSource; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.timeboundary.TimeBoundaryQuery; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedHashMap; + +/** + */ +public class TierAwareQueryRunnerTest +{ + private ServerDiscoverySelector selector; + private BrokerSelector brokerSelector; + private TierConfig config; + private Server server; + + @Before + public void setUp() throws Exception + { + selector = EasyMock.createMock(ServerDiscoverySelector.class); + brokerSelector = EasyMock.createMock(BrokerSelector.class); + + config = new TierConfig() + { + @Override + public LinkedHashMap getTierToBrokerMap() + { + return new LinkedHashMap<>( + ImmutableMap.of( + "hot", "hotBroker", + "medium", "mediumBroker", + DruidServer.DEFAULT_TIER, "coldBroker" + ) + ); + } + + @Override + public String getDefaultBrokerServiceName() + { + return "hotBroker"; + } + }; + + server = new Server() + { + @Override + public String getScheme() + { + return null; + } + + @Override + public String getHost() + { + return "foo"; + } + + @Override + public String getAddress() + { + return null; + } + + @Override + public int getPort() + { + return 0; + } + }; + } + + @After + public void tearDown() throws Exception + { + EasyMock.verify(brokerSelector); + EasyMock.verify(selector); + } + + @Test + public void testFindServer() throws Exception + { + EasyMock.expect(brokerSelector.select(EasyMock.anyObject())).andReturn(new Pair("hotBroker", selector)); + EasyMock.replay(brokerSelector); + + EasyMock.expect(selector.pick()).andReturn(server).once(); + EasyMock.replay(selector); + + TierAwareQueryRunner queryRunner = new TierAwareQueryRunner( + null, + null, + null, + brokerSelector, + config + ); + + Server server = queryRunner.findServer( + new TimeBoundaryQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2011-08-31/2011-09-01"))), + null + ) + ); + + Assert.assertEquals("foo", server.getHost()); + } +} diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java new file mode 100644 index 00000000000..740edd8b55b --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -0,0 +1,105 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.logger.Logger; +import io.airlift.command.Command; +import io.druid.curator.discovery.DiscoveryModule; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; +import io.druid.query.MapQueryToolChestWarehouse; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.QueryResource; +import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.router.BrokerSelector; +import io.druid.server.router.CoordinatorRuleManager; +import io.druid.server.router.RouterQuerySegmentWalker; +import io.druid.server.router.TierConfig; +import org.eclipse.jetty.server.Server; + +import java.util.List; + +/** + */ +@Command( + name = "router", + description = "Experimental! Understands tiers and routes things to different brokers" +) +public class CliRouter extends ServerRunnable +{ + private static final Logger log = new Logger(CliRouter.class); + + public CliRouter() + { + super(log); + } + + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.router", TierConfig.class); + + binder.bind(CoordinatorRuleManager.class); + LifecycleModule.register(binder, CoordinatorRuleManager.class); + + binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + + binder.bind(BrokerSelector.class).in(ManageLifecycle.class); + binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class); + + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); + Jerseys.addResource(binder, QueryResource.class); + LifecycleModule.register(binder, QueryResource.class); + + LifecycleModule.register(binder, Server.class); + DiscoveryModule.register(binder, Self.class); + } + + @Provides + @ManageLifecycle + public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( + TierConfig config, + ServerDiscoveryFactory factory + + ) + { + return factory.createSelector(config.getCoordinatorServiceName()); + } + } + ); + } +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 7012e3a3f79..450aa36afe9 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -52,7 +52,7 @@ public class Main .withCommands( CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class, - CliBridge.class + CliBridge.class, CliRouter.class ); builder.withGroup("example")