fix context to be backwards compat

This commit is contained in:
fjy 2014-04-06 09:20:58 -07:00
parent fc80cf0f32
commit 1267fbb7f5
14 changed files with 169 additions and 86 deletions

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -120,6 +121,70 @@ public abstract class BaseQuery<T> implements Query<T>
return retVal == null ? defaultValue : retVal; return retVal == null ? defaultValue : retVal;
} }
@Override
public int getContextPriority(int defaultValue)
{
Object val = context.get("priority");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextBySegment(boolean defaultValue)
{
Object val = context.get("bySegment");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Integer) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextPopulateCache(boolean defaultValue)
{
Object val = context.get("populateCache");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Integer) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextUseCache(boolean defaultValue)
{
Object val = context.get("useCache");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Integer) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides) protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{ {
Map<String, Object> overridden = Maps.newTreeMap(); Map<String, Object> overridden = Maps.newTreeMap();

View File

@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) { if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query); final Sequence<T> baseSequence = base.run(query);
return new Sequence<T>() return new Sequence<T>()
{ {

View File

@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(Query<T> query) public Sequence<T> run(Query<T> query)
{ {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) { if (query.getContextBySegment(false)) {
return baseRunner.run(query); return baseRunner.run(query);
} }

View File

@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); final int priority = query.getContextValue("priority", 0);
return new BaseSequence<T, Iterator<T>>( return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>() new BaseSequence.IteratorMaker<T, Iterator<T>>()

View File

@ -48,8 +48,8 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment")); final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true")); final boolean shouldFinalize = query.getContextFinalize(true);
if (shouldFinalize) { if (shouldFinalize) {
Function<T, T> finalizerFn; Function<T, T> finalizerFn;
if (isBySegment) { if (isBySegment) {
@ -84,8 +84,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
); );
} }
}; };
} } else {
else {
finalizerFn = toolChest.makeMetricManipulatorFn( finalizerFn = toolChest.makeMetricManipulatorFn(
query, query,
new MetricManipulationFn() new MetricManipulationFn()
@ -100,7 +99,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
} }
return Sequences.map( return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))), baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
finalizerFn finalizerFn
); );
} }

View File

@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
query, query,
configSupplier.get() configSupplier.get()
); );
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) { if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found."); log.warn("No queryables found.");

View File

@ -74,6 +74,13 @@ public interface Query<T>
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue); public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
// For backwards compatibility
public int getContextPriority(int defaultValue);
public boolean getContextBySegment(boolean defaultValue);
public boolean getContextPopulateCache(boolean defaultValue);
public boolean getContextUseCache(boolean defaultValue);
public boolean getContextFinalize(boolean defaultValue);
public Query<T> withOverriddenContext(Map<String, Object> contextOverride); public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec); public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);

View File

@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query); return runner.run(query);
} }
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit())), runner.run(query.withLimit(config.getMaxSearchLimit())),

View File

@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query); return runner.run(query);
} }
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)), runner.run(query.withThreshold(minTopNThreshold)),

View File

@ -70,7 +70,7 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
final CacheStrategy strategy = toolChest.getCacheStrategy(query); final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && strategy != null
&& cacheConfig.isPopulateCache() && cacheConfig.isPopulateCache()
// historical only populates distributed cache since the cache lookups are done at broker. // historical only populates distributed cache since the cache lookups are done at broker.

View File

@ -62,7 +62,6 @@ import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -125,24 +124,24 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList(); final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap(); final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true")) final boolean useCache = query.getContextUseCache(true)
&& strategy != null && strategy != null
&& cacheConfig.isUseCache(); && cacheConfig.isUseCache();
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && cacheConfig.isPopulateCache(); && strategy != null && cacheConfig.isPopulateCache();
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>(); ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0"); final int priority = query.getContextPriority(0);
contextBuilder.put("priority", priority); contextBuilder.put("priority", priority);
if (populateCache) { if (populateCache) {
contextBuilder.put(CacheConfig.POPULATE_CACHE, "false"); contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", "true"); contextBuilder.put("bySegment", true);
} }
contextBuilder.put("intermediate", "true"); contextBuilder.put("intermediate", true);
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());

View File

@ -106,7 +106,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public Sequence<T> run(Query<T> query) public Sequence<T> run(Query<T> query)
{ {
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); boolean isBySegment = query.getContextBySegment(false);
Pair<JavaType, JavaType> types = typesMap.get(query.getClass()); Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) { if (types == null) {

View File

@ -106,8 +106,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
} }
req.setAttribute(DISPATCHED, true); req.setAttribute(DISPATCHED, true);
resp.setStatus(200);
resp.setContentType("application/x-javascript");
query = objectMapper.readValue(req.getInputStream(), Query.class); query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId(); queryId = query.getId();
@ -132,6 +130,9 @@ public class AsyncQueryForwardingServlet extends HttpServlet
@Override @Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response) public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{ {
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
byte[] bytes = getContentBytes(response.getContent()); byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) { if (bytes.length > 0) {
try { try {

View File

@ -214,13 +214,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCaching() throws Exception public void testTimeseriesCaching() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(GRANULARITY) .granularity(GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
testQueryCaching( testQueryCaching(
builder.build(), builder.build(),
@ -265,9 +265,9 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -277,13 +277,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCachingTimeZone() throws Exception public void testTimeseriesCachingTimeZone() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY) .granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
testQueryCaching( testQueryCaching(
builder.build(), builder.build(),
@ -305,9 +305,9 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -316,18 +316,22 @@ public class CachingClusteredClientTest
public void testDisableUseCache() throws Exception public void testDisableUseCache() throws Exception
{ {
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.intervals(SEG_SPEC) .intervals(SEG_SPEC)
.filters(DIM_FILTER) .filters(DIM_FILTER)
.granularity(GRANULARITY) .granularity(GRANULARITY)
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS); .postAggregators(POST_AGGS);
testQueryCaching( testQueryCaching(
1, 1,
true, true,
builder.context(ImmutableMap.<String, Object>of("useCache", "false", builder.context(
"populateCache", "true")).build(), ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "true"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -340,8 +344,12 @@ public class CachingClusteredClientTest
testQueryCaching( testQueryCaching(
1, 1,
false, false,
builder.context(ImmutableMap.<String, Object>of("useCache", "false", builder.context(
"populateCache", "false")).build(), ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -352,8 +360,12 @@ public class CachingClusteredClientTest
testQueryCaching( testQueryCaching(
1, 1,
false, false,
builder.context(ImmutableMap.<String, Object>of("useCache", "true", builder.context(
"populateCache", "false")).build(), ImmutableMap.<String, Object>of(
"useCache", "true",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
); );
@ -422,10 +434,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -467,10 +479,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-11-04/2011-11-08") builder.intervals("2011-11-04/2011-11-08")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -533,10 +545,10 @@ public class CachingClusteredClientTest
), ),
client.run( client.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
.metric("imps") .metric("imps")
.aggregators(RENAMED_AGGS) .aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS) .postAggregators(RENAMED_POST_AGGS)
.build() .build()
) )
); );
} }
@ -638,8 +650,8 @@ public class CachingClusteredClientTest
EasyMock.expect(serverView.getQueryRunner(server)) EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner()) .andReturn(expectations.getQueryRunner())
.once(); .once();
final Capture<? extends Query> capture = new Capture(); final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture); queryCaptures.add(capture);
@ -656,8 +668,8 @@ public class CachingClusteredClientTest
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof TopNQuery) { } else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
@ -669,8 +681,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTopNResults(segmentIds, intervals, results)) .andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof SearchQuery) { } else if (query instanceof SearchQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList(); List<Interval> intervals = Lists.newArrayList();
@ -681,8 +693,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results)) .andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once(); .once();
} else if (query instanceof TimeBoundaryQuery) { } else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList(); List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList(); List<Interval> intervals = Lists.newArrayList();
@ -693,8 +705,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults()); results.add(expectation.getResults());
} }
EasyMock.expect(queryable.run(EasyMock.capture(capture))) EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once(); .once();
} else { } else {
throw new ISE("Unknown query type[%s]", query.getClass()); throw new ISE("Unknown query type[%s]", query.getClass());
} }
@ -762,11 +774,11 @@ public class CachingClusteredClientTest
for (Capture queryCapture : queryCaptures) { for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue(); Query capturedQuery = (Query) queryCapture.getValue();
if (expectBySegment) { if (expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); Assert.assertEquals(true, capturedQuery.<Boolean>getContextValue("bySegment"));
} else { } else {
Assert.assertTrue( Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null || capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false") capturedQuery.getContextValue("bySegment").equals(false)
); );
} }
} }
@ -1160,13 +1172,13 @@ public class CachingClusteredClientTest
return new CachingClusteredClient( return new CachingClusteredClient(
new MapQueryToolChestWarehouse( new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder() ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig()) new TimeseriesQueryQueryToolChest(new QueryConfig())
) )
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build() .build()
), ),
new TimelineServerView() new TimelineServerView()
{ {