Merge pull request #436 from metamx/better-context

Extend context to able to take more sophisticated objects than just strings
This commit is contained in:
fjy 2014-03-24 14:40:45 -06:00
commit b0790783b7
21 changed files with 80 additions and 67 deletions

View File

@ -37,14 +37,14 @@ public abstract class BaseQuery<T> implements Query<T>
{
public static String QUERYID = "queryId";
private final DataSource dataSource;
private final Map<String, String> context;
private final Map<String, Object> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;
public BaseQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
Map<String, String> context
Map<String, Object> context
)
{
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
@ -102,28 +102,28 @@ public abstract class BaseQuery<T> implements Query<T>
}
@JsonProperty
public Map<String, String> getContext()
public Map<String, Object> getContext()
{
return context;
}
@Override
public String getContextValue(String key)
public <ContextType> ContextType getContextValue(String key)
{
return context == null ? null : context.get(key);
return context == null ? null : (ContextType) context.get(key);
}
@Override
public String getContextValue(String key, String defaultValue)
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue)
{
String retVal = getContextValue(key);
ContextType retVal = getContextValue(key);
return retVal == null ? defaultValue : retVal;
}
protected Map<String, String> computeOverridenContext(Map<String, String> overrides)
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{
Map<String, String> overridden = Maps.newTreeMap();
final Map<String, String> context = getContext();
Map<String, Object> overridden = Maps.newTreeMap();
final Map<String, Object> context = getContext();
if (context != null) {
overridden.putAll(context);
}
@ -135,28 +135,41 @@ public abstract class BaseQuery<T> implements Query<T>
@Override
public String getId()
{
return getContextValue(QUERYID);
return (String) getContextValue(QUERYID);
}
@Override
public Query withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BaseQuery baseQuery = (BaseQuery) o;
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false;
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false;
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false;
if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null)
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
return false;
}
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) {
return false;
}
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) {
return false;
}
if (querySegmentSpec != null
? !querySegmentSpec.equals(baseQuery.querySegmentSpec)
: baseQuery.querySegmentSpec != null) {
return false;
}
return true;
}

View File

@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query)
{
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
final Sequence<T> baseSequence = base.run(query);
return new Sequence<T>()
{

View File

@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(Query<T> query)
{
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
return baseRunner.run(query);
}

View File

@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()

View File

@ -304,7 +304,7 @@ public class Druids
private QueryGranularity granularity;
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, String> context;
private Map<String, Object> context;
private TimeseriesQueryBuilder()
{
@ -384,7 +384,7 @@ public class Druids
return postAggregatorSpecs;
}
public Map<String, String> getContext()
public Map<String, Object> getContext()
{
return context;
}
@ -465,7 +465,7 @@ public class Druids
return this;
}
public TimeseriesQueryBuilder context(Map<String, String> c)
public TimeseriesQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
@ -505,7 +505,7 @@ public class Druids
private QuerySegmentSpec querySegmentSpec;
private List<String> dimensions;
private SearchQuerySpec querySpec;
private Map<String, String> context;
private Map<String, Object> context;
public SearchQueryBuilder()
{
@ -660,7 +660,7 @@ public class Druids
return this;
}
public SearchQueryBuilder context(Map<String, String> c)
public SearchQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
@ -690,7 +690,7 @@ public class Druids
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, String> context;
private Map<String, Object> context;
public TimeBoundaryQueryBuilder()
{
@ -746,7 +746,7 @@ public class Druids
return this;
}
public TimeBoundaryQueryBuilder context(Map<String, String> c)
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;

View File

@ -48,7 +48,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment"));
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment"));
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true"));
if (shouldFinalize) {
Function<T, T> finalizerFn;
@ -100,7 +100,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))),
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))),
finalizerFn
);
}

View File

@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
query,
configSupplier.get()
);
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");

View File

@ -70,11 +70,11 @@ public interface Query<T>
public Duration getDuration();
public String getContextValue(String key);
public <ContextType> ContextType getContextValue(String key);
public String getContextValue(String key, String defaultValue);
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
public Query<T> withOverriddenContext(Map<String, String> contextOverride);
public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);

View File

@ -86,7 +86,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("orderBy") LimitSpec orderBySpec,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -147,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
HavingSpec havingSpec,
LimitSpec orderBySpec,
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
Map<String, String> context
Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery<Row>
}
@Override
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
{
return new GroupByQuery(
getDataSource(),
@ -268,7 +268,7 @@ public class GroupByQuery extends BaseQuery<Row>
private List<PostAggregator> postAggregatorSpecs;
private HavingSpec havingSpec;
private Map<String, String> context;
private Map<String, Object> context;
private LimitSpec limitSpec = null;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
@ -443,7 +443,7 @@ public class GroupByQuery extends BaseQuery<Row>
return this;
}
public Builder setContext(Map<String, String> context)
public Builder setContext(Map<String, Object> context)
{
this.context = context;
return this;

View File

@ -58,7 +58,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@ -80,7 +80,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> run(Query<Row> input)
{
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
} else {
return runner.run(input);

View File

@ -42,7 +42,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("toInclude") ColumnIncluderator toInclude,
@JsonProperty("merge") Boolean merge,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -77,7 +77,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
}
@Override
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextOverride)
{
return new SegmentMetadataQuery(
getDataSource(),

View File

@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit())),

View File

@ -58,7 +58,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@JsonProperty("searchDimensions") List<String> dimensions,
@JsonProperty("query") SearchQuerySpec querySpec,
@JsonProperty("sort") SearchSortSpec sortSpec,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -112,7 +112,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
}
@Override
public SearchQuery withOverriddenContext(Map<String, String> contextOverrides)
public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new SearchQuery(
getDataSource(),

View File

@ -53,7 +53,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -120,7 +120,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
);
}
public SelectQuery withOverriddenContext(Map<String, String> contextOverrides)
public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new SelectQuery(
getDataSource(),

View File

@ -54,7 +54,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
public TimeBoundaryQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(
@ -78,7 +78,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
}
@Override
public TimeBoundaryQuery withOverriddenContext(Map<String, String> contextOverrides)
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TimeBoundaryQuery(
getDataSource(),

View File

@ -55,7 +55,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -116,7 +116,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
);
}
public TimeseriesQuery withOverriddenContext(Map<String, String> contextOverrides)
public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TimeseriesQuery(
getDataSource(),

View File

@ -62,7 +62,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("context") Map<String, String> context
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, context);
@ -178,7 +178,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
);
}
public TopNQuery withOverriddenContext(Map<String, String> contextOverrides)
public TopNQuery withOverriddenContext(Map<String, Object> contextOverrides)
{
return new TopNQuery(
getDataSource(),

View File

@ -69,7 +69,7 @@ public class TopNQueryBuilder
private QueryGranularity granularity;
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, String> context;
private Map<String, Object> context;
public TopNQueryBuilder()
{
@ -130,7 +130,7 @@ public class TopNQueryBuilder
return postAggregatorSpecs;
}
public Map<String, String> getContext()
public Map<String, Object> getContext()
{
return context;
}
@ -290,7 +290,7 @@ public class TopNQueryBuilder
return this;
}
public TopNQueryBuilder context(Map<String, String> c)
public TopNQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;

View File

@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)),

View File

@ -122,15 +122,15 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
final boolean useCache = Boolean.parseBoolean((String) query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean((String) query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<>();
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0");
final String priority = (String) query.getContextValue("priority", "0");
contextBuilder.put("priority", priority);
if (populateCache) {

View File

@ -121,7 +121,7 @@ public class CachingClusteredClientTest
*/
private static final int RANDOMNESS = 10;
public static final ImmutableMap<String, String> CONTEXT = ImmutableMap.of();
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
@ -325,7 +325,7 @@ public class CachingClusteredClientTest
testQueryCaching(
1,
true,
builder.context(ImmutableMap.of("useCache", "false",
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "true")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -339,7 +339,7 @@ public class CachingClusteredClientTest
testQueryCaching(
1,
false,
builder.context(ImmutableMap.of("useCache", "false",
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
@ -351,7 +351,7 @@ public class CachingClusteredClientTest
testQueryCaching(
1,
false,
builder.context(ImmutableMap.of("useCache", "true",
builder.context(ImmutableMap.<String, Object>of("useCache", "true",
"populateCache", "false")).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);