merge from master
|
@ -37,14 +37,14 @@ public abstract class BaseQuery<T> implements Query<T>
|
||||||
{
|
{
|
||||||
public static String QUERYID = "queryId";
|
public static String QUERYID = "queryId";
|
||||||
private final DataSource dataSource;
|
private final DataSource dataSource;
|
||||||
private final Map<String, String> context;
|
private final Map<String, Object> context;
|
||||||
private final QuerySegmentSpec querySegmentSpec;
|
private final QuerySegmentSpec querySegmentSpec;
|
||||||
private volatile Duration duration;
|
private volatile Duration duration;
|
||||||
|
|
||||||
public BaseQuery(
|
public BaseQuery(
|
||||||
DataSource dataSource,
|
DataSource dataSource,
|
||||||
QuerySegmentSpec querySegmentSpec,
|
QuerySegmentSpec querySegmentSpec,
|
||||||
Map<String, String> context
|
Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
|
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
|
||||||
|
@ -102,28 +102,28 @@ public abstract class BaseQuery<T> implements Query<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public Map<String, String> getContext()
|
public Map<String, Object> getContext()
|
||||||
{
|
{
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getContextValue(String key)
|
public <ContextType> ContextType getContextValue(String key)
|
||||||
{
|
{
|
||||||
return context == null ? null : context.get(key);
|
return context == null ? null : (ContextType) context.get(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getContextValue(String key, String defaultValue)
|
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue)
|
||||||
{
|
{
|
||||||
String retVal = getContextValue(key);
|
ContextType retVal = getContextValue(key);
|
||||||
return retVal == null ? defaultValue : retVal;
|
return retVal == null ? defaultValue : retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Map<String, String> computeOverridenContext(Map<String, String> overrides)
|
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
|
||||||
{
|
{
|
||||||
Map<String, String> overridden = Maps.newTreeMap();
|
Map<String, Object> overridden = Maps.newTreeMap();
|
||||||
final Map<String, String> context = getContext();
|
final Map<String, Object> context = getContext();
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
overridden.putAll(context);
|
overridden.putAll(context);
|
||||||
}
|
}
|
||||||
|
@ -135,28 +135,41 @@ public abstract class BaseQuery<T> implements Query<T>
|
||||||
@Override
|
@Override
|
||||||
public String getId()
|
public String getId()
|
||||||
{
|
{
|
||||||
return getContextValue(QUERYID);
|
return (String) getContextValue(QUERYID);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Query withId(String id)
|
public Query withId(String id)
|
||||||
{
|
{
|
||||||
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
|
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
if (this == o) return true;
|
if (this == o) {
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
BaseQuery baseQuery = (BaseQuery) o;
|
BaseQuery baseQuery = (BaseQuery) o;
|
||||||
|
|
||||||
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false;
|
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
|
||||||
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false;
|
|
||||||
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false;
|
|
||||||
if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null)
|
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (querySegmentSpec != null
|
||||||
|
? !querySegmentSpec.equals(baseQuery.querySegmentSpec)
|
||||||
|
: baseQuery.querySegmentSpec != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Sequence<T> run(final Query<T> query)
|
public Sequence<T> run(final Query<T> query)
|
||||||
{
|
{
|
||||||
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
|
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
|
||||||
final Sequence<T> baseSequence = base.run(query);
|
final Sequence<T> baseSequence = base.run(query);
|
||||||
return new Sequence<T>()
|
return new Sequence<T>()
|
||||||
{
|
{
|
||||||
|
|
|
@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
|
||||||
@Override
|
@Override
|
||||||
public Sequence<T> run(Query<T> query)
|
public Sequence<T> run(Query<T> query)
|
||||||
{
|
{
|
||||||
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
|
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
|
||||||
return baseRunner.run(query);
|
return baseRunner.run(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
||||||
@Override
|
@Override
|
||||||
public Sequence<T> run(final Query<T> query)
|
public Sequence<T> run(final Query<T> query)
|
||||||
{
|
{
|
||||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
|
||||||
|
|
||||||
return new BaseSequence<T, Iterator<T>>(
|
return new BaseSequence<T, Iterator<T>>(
|
||||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||||
|
|
|
@ -304,7 +304,7 @@ public class Druids
|
||||||
private QueryGranularity granularity;
|
private QueryGranularity granularity;
|
||||||
private List<AggregatorFactory> aggregatorSpecs;
|
private List<AggregatorFactory> aggregatorSpecs;
|
||||||
private List<PostAggregator> postAggregatorSpecs;
|
private List<PostAggregator> postAggregatorSpecs;
|
||||||
private Map<String, String> context;
|
private Map<String, Object> context;
|
||||||
|
|
||||||
private TimeseriesQueryBuilder()
|
private TimeseriesQueryBuilder()
|
||||||
{
|
{
|
||||||
|
@ -384,7 +384,7 @@ public class Druids
|
||||||
return postAggregatorSpecs;
|
return postAggregatorSpecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getContext()
|
public Map<String, Object> getContext()
|
||||||
{
|
{
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
@ -465,7 +465,7 @@ public class Druids
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimeseriesQueryBuilder context(Map<String, String> c)
|
public TimeseriesQueryBuilder context(Map<String, Object> c)
|
||||||
{
|
{
|
||||||
context = c;
|
context = c;
|
||||||
return this;
|
return this;
|
||||||
|
@ -505,7 +505,7 @@ public class Druids
|
||||||
private QuerySegmentSpec querySegmentSpec;
|
private QuerySegmentSpec querySegmentSpec;
|
||||||
private List<String> dimensions;
|
private List<String> dimensions;
|
||||||
private SearchQuerySpec querySpec;
|
private SearchQuerySpec querySpec;
|
||||||
private Map<String, String> context;
|
private Map<String, Object> context;
|
||||||
|
|
||||||
public SearchQueryBuilder()
|
public SearchQueryBuilder()
|
||||||
{
|
{
|
||||||
|
@ -660,7 +660,7 @@ public class Druids
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SearchQueryBuilder context(Map<String, String> c)
|
public SearchQueryBuilder context(Map<String, Object> c)
|
||||||
{
|
{
|
||||||
context = c;
|
context = c;
|
||||||
return this;
|
return this;
|
||||||
|
@ -690,7 +690,7 @@ public class Druids
|
||||||
{
|
{
|
||||||
private DataSource dataSource;
|
private DataSource dataSource;
|
||||||
private QuerySegmentSpec querySegmentSpec;
|
private QuerySegmentSpec querySegmentSpec;
|
||||||
private Map<String, String> context;
|
private Map<String, Object> context;
|
||||||
|
|
||||||
public TimeBoundaryQueryBuilder()
|
public TimeBoundaryQueryBuilder()
|
||||||
{
|
{
|
||||||
|
@ -746,7 +746,7 @@ public class Druids
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimeBoundaryQueryBuilder context(Map<String, String> c)
|
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
|
||||||
{
|
{
|
||||||
context = c;
|
context = c;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -48,7 +48,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
||||||
@Override
|
@Override
|
||||||
public Sequence<T> run(final Query<T> query)
|
public Sequence<T> run(final Query<T> query)
|
||||||
{
|
{
|
||||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment"));
|
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment"));
|
||||||
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true"));
|
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true"));
|
||||||
if (shouldFinalize) {
|
if (shouldFinalize) {
|
||||||
Function<T, T> finalizerFn;
|
Function<T, T> finalizerFn;
|
||||||
|
@ -100,7 +100,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
return Sequences.map(
|
return Sequences.map(
|
||||||
baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))),
|
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))),
|
||||||
finalizerFn
|
finalizerFn
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
||||||
query,
|
query,
|
||||||
configSupplier.get()
|
configSupplier.get()
|
||||||
);
|
);
|
||||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
|
||||||
|
|
||||||
if (Iterables.isEmpty(queryables)) {
|
if (Iterables.isEmpty(queryables)) {
|
||||||
log.warn("No queryables found.");
|
log.warn("No queryables found.");
|
||||||
|
|
|
@ -70,11 +70,11 @@ public interface Query<T>
|
||||||
|
|
||||||
public Duration getDuration();
|
public Duration getDuration();
|
||||||
|
|
||||||
public String getContextValue(String key);
|
public <ContextType> ContextType getContextValue(String key);
|
||||||
|
|
||||||
public String getContextValue(String key, String defaultValue);
|
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
|
||||||
|
|
||||||
public Query<T> withOverriddenContext(Map<String, String> contextOverride);
|
public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
|
||||||
|
|
||||||
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
|
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
|
||||||
|
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
@JsonProperty("having") HavingSpec havingSpec,
|
@JsonProperty("having") HavingSpec havingSpec,
|
||||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||||
@JsonProperty("orderBy") LimitSpec orderBySpec,
|
@JsonProperty("orderBy") LimitSpec orderBySpec,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -147,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
HavingSpec havingSpec,
|
HavingSpec havingSpec,
|
||||||
LimitSpec orderBySpec,
|
LimitSpec orderBySpec,
|
||||||
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
|
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
|
||||||
Map<String, String> context
|
Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
|
public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
|
||||||
{
|
{
|
||||||
return new GroupByQuery(
|
return new GroupByQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
@ -268,7 +268,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
private List<PostAggregator> postAggregatorSpecs;
|
private List<PostAggregator> postAggregatorSpecs;
|
||||||
private HavingSpec havingSpec;
|
private HavingSpec havingSpec;
|
||||||
|
|
||||||
private Map<String, String> context;
|
private Map<String, Object> context;
|
||||||
|
|
||||||
private LimitSpec limitSpec = null;
|
private LimitSpec limitSpec = null;
|
||||||
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
|
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
|
||||||
|
@ -443,7 +443,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setContext(Map<String, String> context)
|
public Builder setContext(Map<String, Object> context)
|
||||||
{
|
{
|
||||||
this.context = context;
|
this.context = context;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
|
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
|
||||||
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
|
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false");
|
||||||
private final Supplier<GroupByQueryConfig> configSupplier;
|
private final Supplier<GroupByQueryConfig> configSupplier;
|
||||||
private GroupByQueryEngine engine; // For running the outer query around a subquery
|
private GroupByQueryEngine engine; // For running the outer query around a subquery
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Row> run(Query<Row> input)
|
public Sequence<Row> run(Query<Row> input)
|
||||||
{
|
{
|
||||||
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||||
} else {
|
} else {
|
||||||
return runner.run(input);
|
return runner.run(input);
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
||||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||||
@JsonProperty("toInclude") ColumnIncluderator toInclude,
|
@JsonProperty("toInclude") ColumnIncluderator toInclude,
|
||||||
@JsonProperty("merge") Boolean merge,
|
@JsonProperty("merge") Boolean merge,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -77,7 +77,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
|
public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextOverride)
|
||||||
{
|
{
|
||||||
return new SegmentMetadataQuery(
|
return new SegmentMetadataQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
||||||
return runner.run(query);
|
return runner.run(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
|
||||||
|
|
||||||
return Sequences.map(
|
return Sequences.map(
|
||||||
runner.run(query.withLimit(config.getMaxSearchLimit())),
|
runner.run(query.withLimit(config.getMaxSearchLimit())),
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
||||||
@JsonProperty("searchDimensions") List<String> dimensions,
|
@JsonProperty("searchDimensions") List<String> dimensions,
|
||||||
@JsonProperty("query") SearchQuerySpec querySpec,
|
@JsonProperty("query") SearchQuerySpec querySpec,
|
||||||
@JsonProperty("sort") SearchSortSpec sortSpec,
|
@JsonProperty("sort") SearchSortSpec sortSpec,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -112,7 +112,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SearchQuery withOverriddenContext(Map<String, String> contextOverrides)
|
public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||||
{
|
{
|
||||||
return new SearchQuery(
|
return new SearchQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
||||||
@JsonProperty("dimensions") List<String> dimensions,
|
@JsonProperty("dimensions") List<String> dimensions,
|
||||||
@JsonProperty("metrics") List<String> metrics,
|
@JsonProperty("metrics") List<String> metrics,
|
||||||
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
|
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -120,7 +120,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SelectQuery withOverriddenContext(Map<String, String> contextOverrides)
|
public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||||
{
|
{
|
||||||
return new SelectQuery(
|
return new SelectQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
|
||||||
public TimeBoundaryQuery(
|
public TimeBoundaryQuery(
|
||||||
@JsonProperty("dataSource") DataSource dataSource,
|
@JsonProperty("dataSource") DataSource dataSource,
|
||||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
|
@ -78,7 +78,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TimeBoundaryQuery withOverriddenContext(Map<String, String> contextOverrides)
|
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||||
{
|
{
|
||||||
return new TimeBoundaryQuery(
|
return new TimeBoundaryQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
||||||
@JsonProperty("granularity") QueryGranularity granularity,
|
@JsonProperty("granularity") QueryGranularity granularity,
|
||||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
||||||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -116,7 +116,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimeseriesQuery withOverriddenContext(Map<String, String> contextOverrides)
|
public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||||
{
|
{
|
||||||
return new TimeseriesQuery(
|
return new TimeseriesQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
||||||
@JsonProperty("granularity") QueryGranularity granularity,
|
@JsonProperty("granularity") QueryGranularity granularity,
|
||||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
||||||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||||
@JsonProperty("context") Map<String, String> context
|
@JsonProperty("context") Map<String, Object> context
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, querySegmentSpec, context);
|
super(dataSource, querySegmentSpec, context);
|
||||||
|
@ -178,7 +178,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopNQuery withOverriddenContext(Map<String, String> contextOverrides)
|
public TopNQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||||
{
|
{
|
||||||
return new TopNQuery(
|
return new TopNQuery(
|
||||||
getDataSource(),
|
getDataSource(),
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TopNQueryBuilder
|
||||||
private QueryGranularity granularity;
|
private QueryGranularity granularity;
|
||||||
private List<AggregatorFactory> aggregatorSpecs;
|
private List<AggregatorFactory> aggregatorSpecs;
|
||||||
private List<PostAggregator> postAggregatorSpecs;
|
private List<PostAggregator> postAggregatorSpecs;
|
||||||
private Map<String, String> context;
|
private Map<String, Object> context;
|
||||||
|
|
||||||
public TopNQueryBuilder()
|
public TopNQueryBuilder()
|
||||||
{
|
{
|
||||||
|
@ -130,7 +130,7 @@ public class TopNQueryBuilder
|
||||||
return postAggregatorSpecs;
|
return postAggregatorSpecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, String> getContext()
|
public Map<String, Object> getContext()
|
||||||
{
|
{
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
@ -290,7 +290,7 @@ public class TopNQueryBuilder
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopNQueryBuilder context(Map<String, String> c)
|
public TopNQueryBuilder context(Map<String, Object> c)
|
||||||
{
|
{
|
||||||
context = c;
|
context = c;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||||
return runner.run(query);
|
return runner.run(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
|
||||||
|
|
||||||
return Sequences.map(
|
return Sequences.map(
|
||||||
runner.run(query.withThreshold(minTopNThreshold)),
|
runner.run(query.withThreshold(minTopNThreshold)),
|
||||||
|
|
|
@ -8,7 +8,7 @@ zip : sgmd0658-yang.zip
|
||||||
|
|
||||||
%.zip : %.pdf
|
%.zip : %.pdf
|
||||||
@rm -f dummy.ps
|
@rm -f dummy.ps
|
||||||
@touch dummy.ps
|
@echo 1234 > dummy.ps
|
||||||
zip $@ $*.pdf $*.tex dummy.ps
|
zip $@ $*.pdf $*.tex dummy.ps
|
||||||
|
|
||||||
clean :
|
clean :
|
||||||
|
|
|
@ -1,6 +1,17 @@
|
||||||
\documentclass{sig-alternate-2013}
|
\documentclass{sig-alternate-2013}
|
||||||
|
|
||||||
\input{sig-license.tex}
|
\newfont{\mycrnotice}{ptmr8t at 7pt}
|
||||||
|
\newfont{\myconfname}{ptmri8t at 7pt}
|
||||||
|
\let\crnotice\mycrnotice%
|
||||||
|
\let\confname\myconfname%
|
||||||
|
\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.}
|
||||||
|
\conferenceinfo{SIGMOD/PODS'14,}{June 22--27, 2014, Salt Lake City, UT, USA. \\
|
||||||
|
{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}}
|
||||||
|
\copyrightetc{ACM \the\acmcopyr}
|
||||||
|
\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\
|
||||||
|
\href{http://dx.doi.org/10.1145/2588555.2595631}{http://dx.doi.org/10.1145/2588555.2595631}}
|
||||||
|
\clubpenalty=10000
|
||||||
|
\widowpenalty = 10000
|
||||||
|
|
||||||
\usepackage{graphicx}
|
\usepackage{graphicx}
|
||||||
\usepackage{balance}
|
\usepackage{balance}
|
||||||
|
@ -50,8 +61,7 @@
|
||||||
\maketitle
|
\maketitle
|
||||||
|
|
||||||
\begin{abstract}
|
\begin{abstract}
|
||||||
Druid is an open
|
Druid is an open source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
|
||||||
source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
|
|
||||||
data store designed for real-time exploratory analytics on large data sets.
|
data store designed for real-time exploratory analytics on large data sets.
|
||||||
The system combines a column-oriented storage layout, a distributed,
|
The system combines a column-oriented storage layout, a distributed,
|
||||||
shared-nothing architecture, and an advanced indexing structure to allow for
|
shared-nothing architecture, and an advanced indexing structure to allow for
|
||||||
|
@ -120,7 +130,6 @@ service, and attempts to help inform anyone who faces a similar problem about a
|
||||||
potential method of solving it. Druid is deployed in production at several
|
potential method of solving it. Druid is deployed in production at several
|
||||||
technology
|
technology
|
||||||
companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
|
companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
|
||||||
|
|
||||||
The structure of the paper is as follows: we first describe the problem in
|
The structure of the paper is as follows: we first describe the problem in
|
||||||
Section \ref{sec:problem-definition}. Next, we detail system architecture from
|
Section \ref{sec:problem-definition}. Next, we detail system architecture from
|
||||||
the point of view of how data flows through the system in Section
|
the point of view of how data flows through the system in Section
|
||||||
|
@ -134,6 +143,21 @@ in Section \ref{sec:related}.
|
||||||
\section{Problem Definition}
|
\section{Problem Definition}
|
||||||
\label{sec:problem-definition}
|
\label{sec:problem-definition}
|
||||||
|
|
||||||
|
\begin{table*}
|
||||||
|
\centering
|
||||||
|
\begin{tabular}{| l | l | l | l | l | l | l | l |}
|
||||||
|
\hline
|
||||||
|
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
|
||||||
|
2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline
|
||||||
|
2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline
|
||||||
|
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
|
||||||
|
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
|
||||||
|
\end{tabular}
|
||||||
|
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
|
||||||
|
\label{tab:sample_data}
|
||||||
|
\end{table*}
|
||||||
|
|
||||||
|
|
||||||
Druid was originally designed to solve problems around ingesting and exploring
|
Druid was originally designed to solve problems around ingesting and exploring
|
||||||
large quantities of transactional events (log data). This form of timeseries
|
large quantities of transactional events (log data). This form of timeseries
|
||||||
data is commonly found in OLAP workflows and the nature of the data tends to be
|
data is commonly found in OLAP workflows and the nature of the data tends to be
|
||||||
|
@ -149,20 +173,6 @@ there are a set of metric columns that contain values (usually numeric) that
|
||||||
can be aggregated, such as the number of characters added or removed in an
|
can be aggregated, such as the number of characters added or removed in an
|
||||||
edit.
|
edit.
|
||||||
|
|
||||||
\begin{table*}
|
|
||||||
\centering
|
|
||||||
\begin{tabular}{| l | l | l | l | l | l | l | l |}
|
|
||||||
\hline
|
|
||||||
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
|
|
||||||
2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline
|
|
||||||
2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline
|
|
||||||
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
|
|
||||||
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
|
|
||||||
\end{tabular}
|
|
||||||
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
|
|
||||||
\label{tab:sample_data}
|
|
||||||
\end{table*}
|
|
||||||
|
|
||||||
Our goal is to rapidly compute drill-downs and aggregates over this data. We
|
Our goal is to rapidly compute drill-downs and aggregates over this data. We
|
||||||
want to answer questions like “How many edits were made on the page Justin
|
want to answer questions like “How many edits were made on the page Justin
|
||||||
Bieber from males in San Francisco?” and “What is the average number of
|
Bieber from males in San Francisco?” and “What is the average number of
|
||||||
|
@ -207,12 +217,13 @@ designed to perform a specific set of things. We believe this design separates
|
||||||
concerns and simplifies the complexity of the system. The different node types
|
concerns and simplifies the complexity of the system. The different node types
|
||||||
operate fairly independent of each other and there is minimal interaction
|
operate fairly independent of each other and there is minimal interaction
|
||||||
among them. Hence, intra-cluster communication failures have minimal impact
|
among them. Hence, intra-cluster communication failures have minimal impact
|
||||||
on data availability. To solve complex data analysis problems, the different
|
on data availability.
|
||||||
node types come together to form a fully working system. The name Druid comes
|
|
||||||
from the Druid class in many role-playing games: it is a shape-shifter, capable
|
To solve complex data analysis problems, the different
|
||||||
of taking on many different forms to fulfill various different roles in a
|
node types come together to form a fully working system. The composition of and
|
||||||
group. The composition of and flow of data in a Druid cluster are shown in
|
flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. The name Druid comes from the Druid class in many role-playing games: it is a
|
||||||
Figure~\ref{fig:cluster}.
|
shape-shifter, capable of taking on many different forms to fulfill various
|
||||||
|
different roles in a group.
|
||||||
|
|
||||||
\begin{figure*}
|
\begin{figure*}
|
||||||
\centering
|
\centering
|
||||||
|
@ -221,7 +232,6 @@ Figure~\ref{fig:cluster}.
|
||||||
\label{fig:cluster}
|
\label{fig:cluster}
|
||||||
\end{figure*}
|
\end{figure*}
|
||||||
|
|
||||||
\newpage
|
|
||||||
\subsection{Real-time Nodes}
|
\subsection{Real-time Nodes}
|
||||||
\label{sec:realtime}
|
\label{sec:realtime}
|
||||||
Real-time nodes encapsulate the functionality to ingest and query event
|
Real-time nodes encapsulate the functionality to ingest and query event
|
||||||
|
@ -249,10 +259,11 @@ in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
|
||||||
\begin{figure}
|
\begin{figure}
|
||||||
\centering
|
\centering
|
||||||
\includegraphics[width = 2.6in]{realtime_flow}
|
\includegraphics[width = 2.6in]{realtime_flow}
|
||||||
\caption{Real-time nodes first buffer events in memory. On a periodic basis,
|
\caption{Real-time nodes buffer events to an in-memory index, which is
|
||||||
the in-memory index is persisted to disk. On another periodic basis, all
|
regularly persisted to disk. On a periodic basis, persisted indexes are then merged
|
||||||
persisted indexes are merged together and handed off. Queries will hit the
|
together before getting handed off.
|
||||||
in-memory index and the persisted indexes.}
|
Queries will hit both the in-memory and persisted indexes.
|
||||||
|
}
|
||||||
\label{fig:realtime_flow}
|
\label{fig:realtime_flow}
|
||||||
\end{figure}
|
\end{figure}
|
||||||
|
|
||||||
|
@ -417,9 +428,7 @@ caching the results would be unreliable.
|
||||||
\begin{figure*}
|
\begin{figure*}
|
||||||
\centering
|
\centering
|
||||||
\includegraphics[width = 4.5in]{caching}
|
\includegraphics[width = 4.5in]{caching}
|
||||||
\caption{Broker nodes cache per segment results. Every Druid query is mapped to
|
\caption{Results are cached per segment. Queries combine cached results with results computed on historical and real-time nodes.}
|
||||||
a set of segments. Queries often combine cached segment results with those that
|
|
||||||
need to be computed on historical and real-time nodes.}
|
|
||||||
\label{fig:caching}
|
\label{fig:caching}
|
||||||
\end{figure*}
|
\end{figure*}
|
||||||
|
|
||||||
|
@ -791,7 +800,7 @@ involving all columns are very rare.
|
||||||
|
|
||||||
\begin{table}
|
\begin{table}
|
||||||
\centering
|
\centering
|
||||||
\begin{tabular}{| l | l | l |}
|
\scriptsize\begin{tabular}{| l | l | l |}
|
||||||
\hline
|
\hline
|
||||||
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
|
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
|
||||||
\texttt{a} & 25 & 21 \\ \hline
|
\texttt{a} & 25 & 21 \\ \hline
|
||||||
|
@ -803,6 +812,7 @@ involving all columns are very rare.
|
||||||
\texttt{g} & 26 & 18 \\ \hline
|
\texttt{g} & 26 & 18 \\ \hline
|
||||||
\texttt{h} & 78 & 14 \\ \hline
|
\texttt{h} & 78 & 14 \\ \hline
|
||||||
\end{tabular}
|
\end{tabular}
|
||||||
|
\normalsize
|
||||||
\caption{Characteristics of production data sources.}
|
\caption{Characteristics of production data sources.}
|
||||||
\label{tab:datasources}
|
\label{tab:datasources}
|
||||||
\end{table}
|
\end{table}
|
||||||
|
@ -900,6 +910,7 @@ well.
|
||||||
\begin{figure}
|
\begin{figure}
|
||||||
\centering
|
\centering
|
||||||
\includegraphics[width = 2.3in]{tpch_scaling}
|
\includegraphics[width = 2.3in]{tpch_scaling}
|
||||||
|
\includegraphics[width = 2.3in]{tpch_scaling_factor}
|
||||||
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
|
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
|
||||||
\label{fig:tpch_scaling}
|
\label{fig:tpch_scaling}
|
||||||
\end{figure}
|
\end{figure}
|
||||||
|
|
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 53 KiB |
Before Width: | Height: | Size: 28 KiB |
Before Width: | Height: | Size: 51 KiB |
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 36 KiB |
Before Width: | Height: | Size: 43 KiB |
|
@ -1,12 +0,0 @@
|
||||||
\newfont{\mycrnotice}{ptmr8t at 7pt}
|
|
||||||
\newfont{\myconfname}{ptmri8t at 7pt}
|
|
||||||
\let\crnotice\mycrnotice%
|
|
||||||
\let\confname\myconfname%
|
|
||||||
\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.}
|
|
||||||
\conferenceinfo{SIGMOD'14,}{June 22--27, 2014, Snowbird, UT, USA. \\
|
|
||||||
{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}}
|
|
||||||
\copyrightetc{ACM \the\acmcopyr}
|
|
||||||
\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\
|
|
||||||
Include the http://DOI string/url which is specific for your submission and included in the ACM rightsreview confirmation email upon completing your ACM form}
|
|
||||||
\clubpenalty=10000
|
|
||||||
\widowpenalty = 10000
|
|
|
@ -132,7 +132,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||||
|
|
||||||
|
|
||||||
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<>();
|
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
|
||||||
|
|
||||||
final String priority = query.getContextValue("priority", "0");
|
final String priority = query.getContextValue("priority", "0");
|
||||||
contextBuilder.put("priority", priority);
|
contextBuilder.put("priority", priority);
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class DatabaseRuleManager
|
||||||
|
|
||||||
this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
|
this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
|
||||||
|
|
||||||
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultTier(), jsonMapper);
|
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper);
|
||||||
ScheduledExecutors.scheduleWithFixedDelay(
|
ScheduledExecutors.scheduleWithFixedDelay(
|
||||||
exec,
|
exec,
|
||||||
new Duration(0),
|
new Duration(0),
|
||||||
|
@ -274,8 +274,8 @@ public class DatabaseRuleManager
|
||||||
if (theRules.get(dataSource) != null) {
|
if (theRules.get(dataSource) != null) {
|
||||||
retVal.addAll(theRules.get(dataSource));
|
retVal.addAll(theRules.get(dataSource));
|
||||||
}
|
}
|
||||||
if (theRules.get(config.get().getDefaultTier()) != null) {
|
if (theRules.get(config.get().getDefaultRule()) != null) {
|
||||||
retVal.addAll(theRules.get(config.get().getDefaultTier()));
|
retVal.addAll(theRules.get(config.get().getDefaultRule()));
|
||||||
}
|
}
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,14 +27,14 @@ import org.joda.time.Period;
|
||||||
public class DatabaseRuleManagerConfig
|
public class DatabaseRuleManagerConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private String defaultTier = "_default";
|
private String defaultRule = "_default";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private Period pollDuration = new Period("PT1M");
|
private Period pollDuration = new Period("PT1M");
|
||||||
|
|
||||||
public String getDefaultTier()
|
public String getDefaultRule()
|
||||||
{
|
{
|
||||||
return defaultTier;
|
return defaultRule;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Period getPollDuration()
|
public Period getPollDuration()
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class DatabaseRuleManagerProvider implements Provider<DatabaseRuleManager
|
||||||
{
|
{
|
||||||
dbConnector.createRulesTable();
|
dbConnector.createRulesTable();
|
||||||
DatabaseRuleManager.createDefaultRule(
|
DatabaseRuleManager.createDefaultRule(
|
||||||
dbConnector.getDBI(), dbTables.get().getRulesTable(), config.get().getDefaultTier(), jsonMapper
|
dbConnector.getDBI(), dbTables.get().getRulesTable(), config.get().getDefaultRule(), jsonMapper
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,6 @@ import io.druid.curator.CuratorModule;
|
||||||
import io.druid.curator.discovery.DiscoveryModule;
|
import io.druid.curator.discovery.DiscoveryModule;
|
||||||
import io.druid.guice.AWSModule;
|
import io.druid.guice.AWSModule;
|
||||||
import io.druid.guice.AnnouncerModule;
|
import io.druid.guice.AnnouncerModule;
|
||||||
import io.druid.guice.LocalDataStorageDruidModule;
|
|
||||||
import io.druid.guice.DbConnectorModule;
|
import io.druid.guice.DbConnectorModule;
|
||||||
import io.druid.guice.DruidGuiceExtensions;
|
import io.druid.guice.DruidGuiceExtensions;
|
||||||
import io.druid.guice.DruidProcessingModule;
|
import io.druid.guice.DruidProcessingModule;
|
||||||
|
@ -47,6 +46,7 @@ import io.druid.guice.IndexingServiceDiscoveryModule;
|
||||||
import io.druid.guice.JacksonConfigManagerModule;
|
import io.druid.guice.JacksonConfigManagerModule;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.LifecycleModule;
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.LocalDataStorageDruidModule;
|
||||||
import io.druid.guice.QueryRunnerFactoryModule;
|
import io.druid.guice.QueryRunnerFactoryModule;
|
||||||
import io.druid.guice.QueryableModule;
|
import io.druid.guice.QueryableModule;
|
||||||
import io.druid.guice.ServerModule;
|
import io.druid.guice.ServerModule;
|
||||||
|
@ -107,9 +107,10 @@ public class Initialization
|
||||||
/**
|
/**
|
||||||
* @param clazz Module class
|
* @param clazz Module class
|
||||||
* @param <T>
|
* @param <T>
|
||||||
|
*
|
||||||
* @return Returns the set of modules loaded.
|
* @return Returns the set of modules loaded.
|
||||||
*/
|
*/
|
||||||
public static<T> Set<T> getLoadedModules(Class<T> clazz)
|
public static <T> Set<T> getLoadedModules(Class<T> clazz)
|
||||||
{
|
{
|
||||||
Set<T> retVal = extensionsMap.get(clazz);
|
Set<T> retVal = extensionsMap.get(clazz);
|
||||||
if (retVal == null) {
|
if (retVal == null) {
|
||||||
|
@ -190,22 +191,29 @@ public class Initialization
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
|
try {
|
||||||
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
|
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
|
||||||
for (Artifact artifact : artifacts) {
|
|
||||||
if (!exclusions.contains(artifact.getGroupId())) {
|
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
|
||||||
urls.add(artifact.getFile().toURI().toURL());
|
for (Artifact artifact : artifacts) {
|
||||||
} else {
|
if (!exclusions.contains(artifact.getGroupId())) {
|
||||||
log.debug("Skipped Artifact[%s]", artifact);
|
urls.add(artifact.getFile().toURI().toURL());
|
||||||
|
} else {
|
||||||
|
log.debug("Skipped Artifact[%s]", artifact);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for (URL url : urls) {
|
for (URL url : urls) {
|
||||||
log.info("Added URL[%s]", url);
|
log.info("Added URL[%s]", url);
|
||||||
}
|
}
|
||||||
|
|
||||||
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
|
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
|
||||||
loadersMap.put(coordinate, loader);
|
loadersMap.put(coordinate, loader);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Unable to resolve artifacts for [%s].", dependencyRequest);
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return loader;
|
return loader;
|
||||||
}
|
}
|
||||||
|
@ -232,9 +240,9 @@ public class Initialization
|
||||||
URI u = new URI(uri);
|
URI u = new URI(uri);
|
||||||
Repository r = new Repository(uri);
|
Repository r = new Repository(uri);
|
||||||
|
|
||||||
if(u.getUserInfo() != null) {
|
if (u.getUserInfo() != null) {
|
||||||
String[] auth = u.getUserInfo().split(":", 2);
|
String[] auth = u.getUserInfo().split(":", 2);
|
||||||
if(auth.length == 2) {
|
if (auth.length == 2) {
|
||||||
r.setUsername(auth[0]);
|
r.setUsername(auth[0]);
|
||||||
r.setPassword(auth[1]);
|
r.setPassword(auth[1]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -247,7 +255,7 @@ public class Initialization
|
||||||
}
|
}
|
||||||
remoteRepositories.add(r);
|
remoteRepositories.add(r);
|
||||||
}
|
}
|
||||||
catch(URISyntaxException e) {
|
catch (URISyntaxException e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,28 +269,30 @@ public class Initialization
|
||||||
|
|
||||||
PrintStream oldOut = System.out;
|
PrintStream oldOut = System.out;
|
||||||
try {
|
try {
|
||||||
System.setOut(new PrintStream(
|
System.setOut(
|
||||||
new OutputStream()
|
new PrintStream(
|
||||||
{
|
new OutputStream()
|
||||||
@Override
|
{
|
||||||
public void write(int b) throws IOException
|
@Override
|
||||||
{
|
public void write(int b) throws IOException
|
||||||
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] b) throws IOException
|
public void write(byte[] b) throws IOException
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] b, int off, int len) throws IOException
|
public void write(byte[] b, int off, int len) throws IOException
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
));
|
)
|
||||||
|
);
|
||||||
return new DefaultTeslaAether(
|
return new DefaultTeslaAether(
|
||||||
config.getLocalRepository(),
|
config.getLocalRepository(),
|
||||||
remoteRepositories.toArray(new Repository[remoteRepositories.size()])
|
remoteRepositories.toArray(new Repository[remoteRepositories.size()])
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.server.coordinator.rules;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -39,4 +40,10 @@ public class ForeverDropRule extends DropRule
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -66,4 +67,10 @@ public class ForeverLoadRule extends LoadRule
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,12 @@ public class IntervalDropRule extends DropRule
|
||||||
@Override
|
@Override
|
||||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
return interval.contains(segment.getInterval());
|
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return interval.contains(theInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,13 @@ public class IntervalLoadRule extends LoadRule
|
||||||
@Override
|
@Override
|
||||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
return interval.contains(segment.getInterval());
|
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return interval.contains(theInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -55,8 +55,14 @@ public class PeriodDropRule extends DropRule
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
final Interval currInterval = new Interval(period, referenceTimestamp);
|
final Interval currInterval = new Interval(period, referenceTimestamp);
|
||||||
return currInterval.contains(segment.getInterval());
|
return currInterval.contains(theInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,8 +86,14 @@ public class PeriodLoadRule extends LoadRule
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
final Interval currInterval = new Interval(period, referenceTimestamp);
|
final Interval currInterval = new Interval(period, referenceTimestamp);
|
||||||
return currInterval.overlaps(segment.getInterval());
|
return currInterval.overlaps(interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import io.druid.server.coordinator.DruidCoordinator;
|
||||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -37,13 +38,15 @@ import org.joda.time.DateTime;
|
||||||
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
||||||
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
|
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
|
||||||
@JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class)
|
@JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class)
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
public interface Rule
|
public interface Rule
|
||||||
{
|
{
|
||||||
public String getType();
|
public String getType();
|
||||||
|
|
||||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp);
|
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp);
|
||||||
|
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp);
|
||||||
|
|
||||||
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
|
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import io.druid.concurrent.Execs;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.server.coordinator.rules.LoadRule;
|
||||||
|
import io.druid.server.coordinator.rules.Rule;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class BrokerSelector<T>
|
||||||
|
{
|
||||||
|
private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
|
||||||
|
|
||||||
|
private final CoordinatorRuleManager ruleManager;
|
||||||
|
private final TierConfig tierConfig;
|
||||||
|
private final ServerDiscoveryFactory serverDiscoveryFactory;
|
||||||
|
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
private volatile boolean started = false;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public BrokerSelector(
|
||||||
|
CoordinatorRuleManager ruleManager,
|
||||||
|
TierConfig tierConfig,
|
||||||
|
ServerDiscoveryFactory serverDiscoveryFactory
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.ruleManager = ruleManager;
|
||||||
|
this.tierConfig = tierConfig;
|
||||||
|
this.serverDiscoveryFactory = serverDiscoveryFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@LifecycleStart
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
|
||||||
|
ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue());
|
||||||
|
selector.start();
|
||||||
|
selectorMap.put(entry.getValue(), selector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@LifecycleStop
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (ServerDiscoverySelector selector : selectorMap.values()) {
|
||||||
|
selector.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
started = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!ruleManager.isStarted() || !started) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
|
||||||
|
|
||||||
|
// find the rule that can apply to the entire set of intervals
|
||||||
|
DateTime now = new DateTime();
|
||||||
|
int lastRulePosition = -1;
|
||||||
|
LoadRule baseRule = null;
|
||||||
|
|
||||||
|
for (Interval interval : query.getIntervals()) {
|
||||||
|
int currRulePosition = 0;
|
||||||
|
for (Rule rule : rules) {
|
||||||
|
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
|
||||||
|
lastRulePosition = currRulePosition;
|
||||||
|
baseRule = (LoadRule) rule;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
currRulePosition++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (baseRule == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// in the baseRule, find the broker of highest priority
|
||||||
|
String brokerServiceName = null;
|
||||||
|
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
|
||||||
|
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
|
||||||
|
brokerServiceName = entry.getValue();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (brokerServiceName == null) {
|
||||||
|
log.makeAlert(
|
||||||
|
"WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
|
||||||
|
query.getDataSource(),
|
||||||
|
query.getIntervals(),
|
||||||
|
tierConfig.getDefaultBrokerServiceName()
|
||||||
|
).emit();
|
||||||
|
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);
|
||||||
|
|
||||||
|
if (retVal == null) {
|
||||||
|
log.makeAlert(
|
||||||
|
"WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]",
|
||||||
|
brokerServiceName,
|
||||||
|
tierConfig.getDefaultBrokerServiceName()
|
||||||
|
).emit();
|
||||||
|
retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName());
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Pair<>(brokerServiceName, retVal);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.api.client.util.Charsets;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
|
import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
|
import io.druid.client.selector.Server;
|
||||||
|
import io.druid.concurrent.Execs;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.guice.annotations.Global;
|
||||||
|
import io.druid.guice.annotations.Json;
|
||||||
|
import io.druid.server.coordinator.rules.Rule;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
@ManageLifecycle
|
||||||
|
public class CoordinatorRuleManager
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(CoordinatorRuleManager.class);
|
||||||
|
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
private final ObjectMapper jsonMapper;
|
||||||
|
private final Supplier<TierConfig> config;
|
||||||
|
private final ServerDiscoverySelector selector;
|
||||||
|
|
||||||
|
private final StatusResponseHandler responseHandler;
|
||||||
|
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
|
||||||
|
|
||||||
|
private volatile ScheduledExecutorService exec;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
private volatile boolean started = false;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public CoordinatorRuleManager(
|
||||||
|
@Global HttpClient httpClient,
|
||||||
|
@Json ObjectMapper jsonMapper,
|
||||||
|
Supplier<TierConfig> config,
|
||||||
|
ServerDiscoverySelector selector
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.httpClient = httpClient;
|
||||||
|
this.jsonMapper = jsonMapper;
|
||||||
|
this.config = config;
|
||||||
|
this.selector = selector;
|
||||||
|
|
||||||
|
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
|
||||||
|
this.rules = new AtomicReference<>(
|
||||||
|
new ConcurrentHashMap<String, List<Rule>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@LifecycleStart
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.exec = Execs.scheduledSingleThreaded("CoordinatorRuleManager-Exec--%d");
|
||||||
|
|
||||||
|
ScheduledExecutors.scheduleWithFixedDelay(
|
||||||
|
exec,
|
||||||
|
new Duration(0),
|
||||||
|
config.get().getPollPeriod().toStandardDuration(),
|
||||||
|
new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
poll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@LifecycleStop
|
||||||
|
public void stop()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (!started) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
rules.set(new ConcurrentHashMap<String, List<Rule>>());
|
||||||
|
|
||||||
|
started = false;
|
||||||
|
exec.shutdownNow();
|
||||||
|
exec = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isStarted()
|
||||||
|
{
|
||||||
|
return started;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void poll()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
String url = getRuleURL();
|
||||||
|
if (url == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
StatusResponseHolder response = httpClient.get(new URL(url))
|
||||||
|
.go(responseHandler)
|
||||||
|
.get();
|
||||||
|
|
||||||
|
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
|
||||||
|
(Map<String, List<Rule>>) jsonMapper.readValue(
|
||||||
|
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
log.info("Got [%,d] rules", newRules.keySet().size());
|
||||||
|
|
||||||
|
rules.set(newRules);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Exception while polling for rules");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Rule> getRulesWithDefault(final String dataSource)
|
||||||
|
{
|
||||||
|
List<Rule> retVal = Lists.newArrayList();
|
||||||
|
Map<String, List<Rule>> theRules = rules.get();
|
||||||
|
if (theRules.get(dataSource) != null) {
|
||||||
|
retVal.addAll(theRules.get(dataSource));
|
||||||
|
}
|
||||||
|
if (theRules.get(config.get().getDefaultRule()) != null) {
|
||||||
|
retVal.addAll(theRules.get(config.get().getDefaultRule()));
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getRuleURL()
|
||||||
|
{
|
||||||
|
Server server = selector.pick();
|
||||||
|
|
||||||
|
if (server == null) {
|
||||||
|
log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return String.format("http://%s%s", server.getHost(), config.get().getRulesEndpoint());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.guice.annotations.Global;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.QuerySegmentWalker;
|
||||||
|
import io.druid.query.QueryToolChestWarehouse;
|
||||||
|
import io.druid.query.SegmentDescriptor;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class RouterQuerySegmentWalker implements QuerySegmentWalker
|
||||||
|
{
|
||||||
|
private final QueryToolChestWarehouse warehouse;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
private final BrokerSelector brokerSelector;
|
||||||
|
private final TierConfig tierConfig;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public RouterQuerySegmentWalker(
|
||||||
|
QueryToolChestWarehouse warehouse,
|
||||||
|
ObjectMapper objectMapper,
|
||||||
|
@Global HttpClient httpClient,
|
||||||
|
BrokerSelector brokerSelector,
|
||||||
|
TierConfig tierConfig
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.warehouse = warehouse;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
this.httpClient = httpClient;
|
||||||
|
this.brokerSelector = brokerSelector;
|
||||||
|
this.tierConfig = tierConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||||
|
{
|
||||||
|
return makeRunner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||||
|
{
|
||||||
|
return makeRunner();
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> QueryRunner<T> makeRunner()
|
||||||
|
{
|
||||||
|
return new TierAwareQueryRunner<T>(
|
||||||
|
warehouse,
|
||||||
|
objectMapper,
|
||||||
|
httpClient,
|
||||||
|
brokerSelector,
|
||||||
|
tierConfig
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.guava.Sequence;
|
||||||
|
import com.metamx.common.guava.Sequences;
|
||||||
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import io.druid.client.DirectDruidClient;
|
||||||
|
import io.druid.client.selector.Server;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.query.QueryRunner;
|
||||||
|
import io.druid.query.QueryToolChestWarehouse;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TierAwareQueryRunner<T> implements QueryRunner<T>
|
||||||
|
{
|
||||||
|
private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
|
||||||
|
|
||||||
|
private final QueryToolChestWarehouse warehouse;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
private final HttpClient httpClient;
|
||||||
|
private final BrokerSelector<T> brokerSelector;
|
||||||
|
private final TierConfig tierConfig;
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
|
||||||
|
|
||||||
|
public TierAwareQueryRunner(
|
||||||
|
QueryToolChestWarehouse warehouse,
|
||||||
|
ObjectMapper objectMapper,
|
||||||
|
HttpClient httpClient,
|
||||||
|
BrokerSelector<T> brokerSelector,
|
||||||
|
TierConfig tierConfig
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.warehouse = warehouse;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
this.httpClient = httpClient;
|
||||||
|
this.brokerSelector = brokerSelector;
|
||||||
|
this.tierConfig = tierConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Server findServer(Query<T> query)
|
||||||
|
{
|
||||||
|
final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
|
||||||
|
final String brokerServiceName = selected.lhs;
|
||||||
|
final ServerDiscoverySelector selector = selected.rhs;
|
||||||
|
|
||||||
|
Server server = selector.pick();
|
||||||
|
if (server == null) {
|
||||||
|
log.error(
|
||||||
|
"WTF?! No server found for brokerServiceName[%s]. Using backup",
|
||||||
|
brokerServiceName
|
||||||
|
);
|
||||||
|
|
||||||
|
server = serverBackup.get(brokerServiceName);
|
||||||
|
|
||||||
|
if (server == null) {
|
||||||
|
log.makeAlert(
|
||||||
|
"WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
|
||||||
|
brokerServiceName,
|
||||||
|
tierConfig.getDefaultBrokerServiceName()
|
||||||
|
).emit();
|
||||||
|
|
||||||
|
server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
serverBackup.put(brokerServiceName, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sequence<T> run(Query<T> query)
|
||||||
|
{
|
||||||
|
Server server = findServer(query);
|
||||||
|
|
||||||
|
if (server == null) {
|
||||||
|
log.makeAlert(
|
||||||
|
"Catastrophic failure! No servers found at all! Failing request!"
|
||||||
|
).emit();
|
||||||
|
return Sequences.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryRunner<T> client = new DirectDruidClient<T>(
|
||||||
|
warehouse,
|
||||||
|
objectMapper,
|
||||||
|
httpClient,
|
||||||
|
server.getHost()
|
||||||
|
);
|
||||||
|
|
||||||
|
return client.run(query);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.druid.client.DruidServer;
|
||||||
|
import org.joda.time.Period;
|
||||||
|
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TierConfig
|
||||||
|
{
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private String defaultBrokerServiceName = "";
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private LinkedHashMap<String, String> tierToBrokerMap;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private String defaultRule = "_default";
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private String rulesEndpoint = "/druid/coordinator/v1/rules";
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private String coordinatorServiceName = null;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private Period pollPeriod = new Period("PT1M");
|
||||||
|
|
||||||
|
// tier, <bard, numThreads>
|
||||||
|
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||||
|
{
|
||||||
|
return tierToBrokerMap == null ? new LinkedHashMap<>(
|
||||||
|
ImmutableMap.of(
|
||||||
|
DruidServer.DEFAULT_TIER, defaultBrokerServiceName
|
||||||
|
)
|
||||||
|
) : tierToBrokerMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDefaultBrokerServiceName()
|
||||||
|
{
|
||||||
|
return defaultBrokerServiceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDefaultRule()
|
||||||
|
{
|
||||||
|
return defaultRule;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRulesEndpoint()
|
||||||
|
{
|
||||||
|
return rulesEndpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCoordinatorServiceName()
|
||||||
|
{
|
||||||
|
return coordinatorServiceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Period getPollPeriod()
|
||||||
|
{
|
||||||
|
return pollPeriod;
|
||||||
|
}
|
||||||
|
}
|
|
@ -122,7 +122,7 @@ public class CachingClusteredClientTest
|
||||||
*/
|
*/
|
||||||
private static final int RANDOMNESS = 10;
|
private static final int RANDOMNESS = 10;
|
||||||
|
|
||||||
public static final ImmutableMap<String, String> CONTEXT = ImmutableMap.of();
|
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
|
||||||
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
|
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
|
||||||
public static final String DATA_SOURCE = "test";
|
public static final String DATA_SOURCE = "test";
|
||||||
|
|
||||||
|
@ -326,7 +326,7 @@ public class CachingClusteredClientTest
|
||||||
testQueryCaching(
|
testQueryCaching(
|
||||||
1,
|
1,
|
||||||
true,
|
true,
|
||||||
builder.context(ImmutableMap.of("useCache", "false",
|
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
|
||||||
"populateCache", "true")).build(),
|
"populateCache", "true")).build(),
|
||||||
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
||||||
);
|
);
|
||||||
|
@ -340,7 +340,7 @@ public class CachingClusteredClientTest
|
||||||
testQueryCaching(
|
testQueryCaching(
|
||||||
1,
|
1,
|
||||||
false,
|
false,
|
||||||
builder.context(ImmutableMap.of("useCache", "false",
|
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
|
||||||
"populateCache", "false")).build(),
|
"populateCache", "false")).build(),
|
||||||
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
||||||
);
|
);
|
||||||
|
@ -352,7 +352,7 @@ public class CachingClusteredClientTest
|
||||||
testQueryCaching(
|
testQueryCaching(
|
||||||
1,
|
1,
|
||||||
false,
|
false,
|
||||||
builder.context(ImmutableMap.of("useCache", "true",
|
builder.context(ImmutableMap.<String, Object>of("useCache", "true",
|
||||||
"populateCache", "false")).build(),
|
"populateCache", "false")).build(),
|
||||||
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
|
||||||
);
|
);
|
||||||
|
|
|
@ -122,6 +122,12 @@ public class LoadRuleTest
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
DruidCluster druidCluster = new DruidCluster(
|
DruidCluster druidCluster = new DruidCluster(
|
||||||
|
@ -214,6 +220,12 @@ public class LoadRuleTest
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
DruidServer server1 = new DruidServer(
|
DruidServer server1 = new DruidServer(
|
||||||
|
|
|
@ -0,0 +1,230 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import io.druid.client.DruidServer;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.guice.annotations.Global;
|
||||||
|
import io.druid.guice.annotations.Json;
|
||||||
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.TableDataSource;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import io.druid.server.coordinator.rules.IntervalLoadRule;
|
||||||
|
import io.druid.server.coordinator.rules.Rule;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class BrokerSelectorTest
|
||||||
|
{
|
||||||
|
private ServerDiscoveryFactory factory;
|
||||||
|
private ServerDiscoverySelector selector;
|
||||||
|
private BrokerSelector brokerSelector;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
|
||||||
|
selector = EasyMock.createMock(ServerDiscoverySelector.class);
|
||||||
|
|
||||||
|
brokerSelector = new BrokerSelector(
|
||||||
|
new TestRuleManager(null, null, null, null),
|
||||||
|
new TierConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||||
|
{
|
||||||
|
return new LinkedHashMap<String, String>(
|
||||||
|
ImmutableMap.<String, String>of(
|
||||||
|
"hot", "hotBroker",
|
||||||
|
"medium", "mediumBroker",
|
||||||
|
DruidServer.DEFAULT_TIER, "coldBroker"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDefaultBrokerServiceName()
|
||||||
|
{
|
||||||
|
return "hotBroker";
|
||||||
|
}
|
||||||
|
},
|
||||||
|
factory
|
||||||
|
);
|
||||||
|
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
|
||||||
|
EasyMock.replay(factory);
|
||||||
|
|
||||||
|
selector.start();
|
||||||
|
EasyMock.expectLastCall().atLeastOnce();
|
||||||
|
selector.stop();
|
||||||
|
EasyMock.expectLastCall().atLeastOnce();
|
||||||
|
EasyMock.replay(selector);
|
||||||
|
|
||||||
|
brokerSelector.start();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
brokerSelector.stop();
|
||||||
|
|
||||||
|
EasyMock.verify(selector);
|
||||||
|
EasyMock.verify(factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicSelect() throws Exception
|
||||||
|
{
|
||||||
|
String brokerName = (String) brokerSelector.select(
|
||||||
|
new TimeBoundaryQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
).lhs;
|
||||||
|
|
||||||
|
Assert.assertEquals("coldBroker", brokerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicSelect2() throws Exception
|
||||||
|
{
|
||||||
|
String brokerName = (String) brokerSelector.select(
|
||||||
|
new TimeBoundaryQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
).lhs;
|
||||||
|
|
||||||
|
Assert.assertEquals("hotBroker", brokerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectMatchesNothing() throws Exception
|
||||||
|
{
|
||||||
|
Pair retVal = brokerSelector.select(
|
||||||
|
new TimeBoundaryQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(null, retVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectMultiInterval() throws Exception
|
||||||
|
{
|
||||||
|
String brokerName = (String) brokerSelector.select(
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource("test")
|
||||||
|
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||||
|
.intervals(
|
||||||
|
new MultipleIntervalSegmentSpec(
|
||||||
|
Arrays.<Interval>asList(
|
||||||
|
new Interval("2013-08-31/2013-09-01"),
|
||||||
|
new Interval("2012-08-31/2012-09-01"),
|
||||||
|
new Interval("2011-08-31/2011-09-01")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).build()
|
||||||
|
).lhs;
|
||||||
|
|
||||||
|
Assert.assertEquals("coldBroker", brokerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectMultiInterval2() throws Exception
|
||||||
|
{
|
||||||
|
String brokerName = (String) brokerSelector.select(
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource("test")
|
||||||
|
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||||
|
.intervals(
|
||||||
|
new MultipleIntervalSegmentSpec(
|
||||||
|
Arrays.<Interval>asList(
|
||||||
|
new Interval("2011-08-31/2011-09-01"),
|
||||||
|
new Interval("2012-08-31/2012-09-01"),
|
||||||
|
new Interval("2013-08-31/2013-09-01")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).build()
|
||||||
|
).lhs;
|
||||||
|
|
||||||
|
Assert.assertEquals("coldBroker", brokerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestRuleManager extends CoordinatorRuleManager
|
||||||
|
{
|
||||||
|
public TestRuleManager(
|
||||||
|
@Global HttpClient httpClient,
|
||||||
|
@Json ObjectMapper jsonMapper,
|
||||||
|
Supplier<TierConfig> config,
|
||||||
|
ServerDiscoverySelector selector
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(httpClient, jsonMapper, config, selector);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStarted()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Rule> getRulesWithDefault(String dataSource)
|
||||||
|
{
|
||||||
|
return Arrays.<Rule>asList(
|
||||||
|
new IntervalLoadRule(new Interval("2013/2014"), ImmutableMap.<String, Integer>of("hot", 1), null, null),
|
||||||
|
new IntervalLoadRule(new Interval("2012/2013"), ImmutableMap.<String, Integer>of("medium", 1), null, null),
|
||||||
|
new IntervalLoadRule(
|
||||||
|
new Interval("2011/2012"),
|
||||||
|
ImmutableMap.<String, Integer>of(DruidServer.DEFAULT_TIER, 1),
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,139 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.router;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import io.druid.client.DruidServer;
|
||||||
|
import io.druid.client.selector.Server;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.query.TableDataSource;
|
||||||
|
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||||
|
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class TierAwareQueryRunnerTest
|
||||||
|
{
|
||||||
|
private ServerDiscoverySelector selector;
|
||||||
|
private BrokerSelector brokerSelector;
|
||||||
|
private TierConfig config;
|
||||||
|
private Server server;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
selector = EasyMock.createMock(ServerDiscoverySelector.class);
|
||||||
|
brokerSelector = EasyMock.createMock(BrokerSelector.class);
|
||||||
|
|
||||||
|
config = new TierConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||||
|
{
|
||||||
|
return new LinkedHashMap<>(
|
||||||
|
ImmutableMap.<String, String>of(
|
||||||
|
"hot", "hotBroker",
|
||||||
|
"medium", "mediumBroker",
|
||||||
|
DruidServer.DEFAULT_TIER, "coldBroker"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDefaultBrokerServiceName()
|
||||||
|
{
|
||||||
|
return "hotBroker";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
server = new Server()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String getScheme()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getHost()
|
||||||
|
{
|
||||||
|
return "foo";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAddress()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPort()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
EasyMock.verify(brokerSelector);
|
||||||
|
EasyMock.verify(selector);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindServer() throws Exception
|
||||||
|
{
|
||||||
|
EasyMock.expect(brokerSelector.select(EasyMock.<Query>anyObject())).andReturn(new Pair("hotBroker", selector));
|
||||||
|
EasyMock.replay(brokerSelector);
|
||||||
|
|
||||||
|
EasyMock.expect(selector.pick()).andReturn(server).once();
|
||||||
|
EasyMock.replay(selector);
|
||||||
|
|
||||||
|
TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
brokerSelector,
|
||||||
|
config
|
||||||
|
);
|
||||||
|
|
||||||
|
Server server = queryRunner.findServer(
|
||||||
|
new TimeBoundaryQuery(
|
||||||
|
new TableDataSource("test"),
|
||||||
|
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", server.getHost());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.cli;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.Provides;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.airlift.command.Command;
|
||||||
|
import io.druid.curator.discovery.DiscoveryModule;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.query.MapQueryToolChestWarehouse;
|
||||||
|
import io.druid.query.QuerySegmentWalker;
|
||||||
|
import io.druid.query.QueryToolChestWarehouse;
|
||||||
|
import io.druid.server.QueryResource;
|
||||||
|
import io.druid.server.initialization.JettyServerInitializer;
|
||||||
|
import io.druid.server.router.BrokerSelector;
|
||||||
|
import io.druid.server.router.CoordinatorRuleManager;
|
||||||
|
import io.druid.server.router.RouterQuerySegmentWalker;
|
||||||
|
import io.druid.server.router.TierConfig;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
@Command(
|
||||||
|
name = "router",
|
||||||
|
description = "Experimental! Understands tiers and routes things to different brokers"
|
||||||
|
)
|
||||||
|
public class CliRouter extends ServerRunnable
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(CliRouter.class);
|
||||||
|
|
||||||
|
public CliRouter()
|
||||||
|
{
|
||||||
|
super(log);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Object> getModules()
|
||||||
|
{
|
||||||
|
return ImmutableList.<Object>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
|
||||||
|
|
||||||
|
binder.bind(CoordinatorRuleManager.class);
|
||||||
|
LifecycleModule.register(binder, CoordinatorRuleManager.class);
|
||||||
|
|
||||||
|
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
|
||||||
|
|
||||||
|
binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
|
||||||
|
binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
|
||||||
|
|
||||||
|
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
||||||
|
Jerseys.addResource(binder, QueryResource.class);
|
||||||
|
LifecycleModule.register(binder, QueryResource.class);
|
||||||
|
|
||||||
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
DiscoveryModule.register(binder, Self.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@ManageLifecycle
|
||||||
|
public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
|
||||||
|
TierConfig config,
|
||||||
|
ServerDiscoveryFactory factory
|
||||||
|
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return factory.createSelector(config.getCoordinatorServiceName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,7 +52,7 @@ public class Main
|
||||||
.withCommands(
|
.withCommands(
|
||||||
CliCoordinator.class, CliHistorical.class, CliBroker.class,
|
CliCoordinator.class, CliHistorical.class, CliBroker.class,
|
||||||
CliRealtime.class, CliOverlord.class, CliMiddleManager.class,
|
CliRealtime.class, CliOverlord.class, CliMiddleManager.class,
|
||||||
CliBridge.class
|
CliBridge.class, CliRouter.class
|
||||||
);
|
);
|
||||||
|
|
||||||
builder.withGroup("example")
|
builder.withGroup("example")
|
||||||
|
|