mirror of https://github.com/apache/druid.git
Merge pull request #799 from metamx/js-tiered-broker-selector
JavaScript tiered broker selector
This commit is contained in:
commit
5a7d186e88
|
@ -159,4 +159,4 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
|
|||
|
||||
```json
|
||||
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
|
||||
```
|
||||
```
|
||||
|
|
|
@ -118,3 +118,16 @@ Including this strategy means all timeBoundary queries are always routed to the
|
|||
```
|
||||
|
||||
Queries with a priority set to less than minPriority are routed to the lowest priority broker. Queries with priority set to greater than maxPriority are routed to the highest priority broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic.
|
||||
|
||||
### javascript
|
||||
|
||||
Allows defining arbitrary routing rules using a JavaScript function. The function is passed the configuration and the query to be executed, and returns the tier it should be routed to, or null for the default tier.
|
||||
|
||||
*Example*: a function that return the highest priority broker unless the given query has more than two aggregators.
|
||||
|
||||
```json
|
||||
{
|
||||
"type" : "javascript",
|
||||
"function" : "function (config, query) { if (config.getTierToBrokerMap().values().size() > 0 && query.getAggregatorSpecs && query.getAggregatorSpecs().size() <= 2) { return config.getTierToBrokerMap().values().toArray()[0] } else { return config.getDefaultBrokerServiceName() } }"
|
||||
}
|
||||
```
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import io.druid.query.Query;
|
||||
|
||||
import javax.script.Compilable;
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptEngine;
|
||||
import javax.script.ScriptEngineManager;
|
||||
import javax.script.ScriptException;
|
||||
|
||||
public class JavaScriptTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
|
||||
{
|
||||
private static interface SelectorFunction
|
||||
{
|
||||
public String apply(TieredBrokerConfig config, Query query);
|
||||
}
|
||||
|
||||
private final SelectorFunction fnSelector;
|
||||
private final String function;
|
||||
|
||||
@JsonCreator
|
||||
public JavaScriptTieredBrokerSelectorStrategy(@JsonProperty("function") String fn)
|
||||
{
|
||||
Preconditions.checkNotNull(fn, "function must not be null");
|
||||
|
||||
final ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
|
||||
try {
|
||||
((Compilable)engine).compile("var apply = " + fn).eval();
|
||||
} catch(ScriptException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
this.function = fn;
|
||||
this.fnSelector = ((Invocable)engine).getInterface(SelectorFunction.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getBrokerServiceName(
|
||||
TieredBrokerConfig config, Query query
|
||||
)
|
||||
{
|
||||
return Optional.fromNullable(fnSelector.apply(config, query));
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFunction()
|
||||
{
|
||||
return function;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
JavaScriptTieredBrokerSelectorStrategy that = (JavaScriptTieredBrokerSelectorStrategy) o;
|
||||
|
||||
if (!function.equals(that.function)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return function.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "JavascriptTieredBrokerSelectorStrategy{" +
|
||||
"function='" + function + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -29,7 +29,8 @@ import io.druid.query.Query;
|
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
|
||||
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class)
|
||||
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class),
|
||||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptTieredBrokerSelectorStrategy.class)
|
||||
})
|
||||
|
||||
public interface TieredBrokerSelectorStrategy
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class JavaScriptTieredBrokerSelectorStrategyTest
|
||||
{
|
||||
final TieredBrokerSelectorStrategy jsStrategy = new JavaScriptTieredBrokerSelectorStrategy(
|
||||
"function (config, query) { if (config.getTierToBrokerMap().values().size() > 0 && query.getAggregatorSpecs && query.getAggregatorSpecs().size() <= 2) { return config.getTierToBrokerMap().values().toArray()[0] } else { return config.getDefaultBrokerServiceName() } }"
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
jsStrategy,
|
||||
mapper.readValue(
|
||||
mapper.writeValueAsString(jsStrategy),
|
||||
JavaScriptTieredBrokerSelectorStrategy.class
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBrokerServiceName() throws Exception
|
||||
{
|
||||
|
||||
|
||||
final LinkedHashMap<String, String> tierBrokerMap = new LinkedHashMap<>();
|
||||
tierBrokerMap.put("fast", "druid/fastBroker");
|
||||
tierBrokerMap.put("slow", "druid/broker");
|
||||
|
||||
final TieredBrokerConfig tieredBrokerConfig = new TieredBrokerConfig()
|
||||
{
|
||||
@Override
|
||||
public String getDefaultBrokerServiceName()
|
||||
{
|
||||
return "druid/broker";
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||
{
|
||||
return tierBrokerMap;
|
||||
}
|
||||
};
|
||||
|
||||
final Druids.TimeseriesQueryBuilder queryBuilder = Druids.newTimeseriesQueryBuilder().dataSource("test")
|
||||
.intervals("2014/2015")
|
||||
.aggregators(
|
||||
ImmutableList.<AggregatorFactory>of(
|
||||
new CountAggregatorFactory("count")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
Optional.of("druid/fastBroker"),
|
||||
jsStrategy.getBrokerServiceName(
|
||||
tieredBrokerConfig,
|
||||
queryBuilder.build()
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
Assert.assertEquals(
|
||||
Optional.of("druid/broker"),
|
||||
jsStrategy.getBrokerServiceName(
|
||||
tieredBrokerConfig,
|
||||
Druids.newTimeBoundaryQueryBuilder().dataSource("test").bound("maxTime").build()
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
Optional.of("druid/broker"),
|
||||
jsStrategy.getBrokerServiceName(
|
||||
tieredBrokerConfig,
|
||||
queryBuilder.aggregators(
|
||||
ImmutableList.of(
|
||||
new CountAggregatorFactory("count"),
|
||||
new LongSumAggregatorFactory("longSum", "a"),
|
||||
new DoubleSumAggregatorFactory("doubleSum", "b")
|
||||
)
|
||||
).build()
|
||||
)
|
||||
);
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue