Merge pull request #672 from metamx/router-strat

Add more config to router strategies; fix bad test
This commit is contained in:
xvrl 2014-08-08 11:37:35 -07:00
commit 038f2de496
4 changed files with 78 additions and 46 deletions

View File

@ -19,6 +19,8 @@
package io.druid.server.router; package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import io.druid.query.Query; import io.druid.query.Query;
@ -27,12 +29,32 @@ import io.druid.query.Query;
*/ */
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{ {
private final int minPriority;
private final int maxPriority;
@JsonCreator
public PriorityTieredBrokerSelectorStrategy(
@JsonProperty("minPriority") Integer minPriority,
@JsonProperty("maxPriority") Integer maxPriority
)
{
this.minPriority = minPriority == null ? 0 : minPriority;
this.maxPriority = maxPriority == null ? 1 : maxPriority;
}
@Override @Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{ {
final int priority = query.getContextPriority(0); final int priority = query.getContextPriority(0);
if (priority < 0) { if (priority < minPriority) {
return Optional.of(
Iterables.getLast(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
} else if (priority >= maxPriority) {
return Optional.of( return Optional.of(
Iterables.getFirst( Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(), tierConfig.getTierToBrokerMap().values(),

View File

@ -20,12 +20,15 @@
package io.druid.server.router; package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import org.joda.time.Period; import org.joda.time.Period;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
/** /**
*/ */
@ -56,7 +59,10 @@ public class TieredBrokerConfig
@JsonProperty @JsonProperty
@NotNull @NotNull
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]"; private List<TieredBrokerSelectorStrategy> strategies = Arrays.asList(
new TimeBoundaryTieredBrokerSelectorStrategy(),
new PriorityTieredBrokerSelectorStrategy(0, 1)
);
// tier, <bard, numThreads> // tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap() public LinkedHashMap<String, String> getTierToBrokerMap()
@ -93,8 +99,8 @@ public class TieredBrokerConfig
return pollPeriod; return pollPeriod;
} }
public String getStrategies() public List<TieredBrokerSelectorStrategy> getStrategies()
{ {
return strategies; return ImmutableList.copyOf(strategies);
} }
} }

View File

@ -19,10 +19,6 @@
package io.druid.server.router; package io.druid.server.router;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider; import com.google.inject.Provider;
@ -32,23 +28,12 @@ import java.util.List;
*/ */
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>> public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
{ {
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList(); private final List<TieredBrokerSelectorStrategy> strategies;
@Inject @Inject
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config) public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config)
{ {
try { this.strategies = config.getStrategies();
this.strategies.addAll(
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
@Override @Override

View File

@ -22,7 +22,6 @@ package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.metamx.common.Pair;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoveryFactory;
@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import junit.framework.Assert; import junit.framework.Assert;
@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest
} }
}, },
factory, factory,
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy()) Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
); );
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce(); EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory); EasyMock.replay(factory);
@ -198,28 +195,50 @@ public class TieredBrokerHostSelectorTest
} }
@Test @Test
public void testPrioritySelect() throws Exception public void testPrioritySelect() throws Exception
{ {
String brokerName = (String) brokerSelector.select( String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder() Druids.newTimeseriesQueryBuilder()
.dataSource("test") .dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count"))) .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.intervals( .intervals(
new MultipleIntervalSegmentSpec( new MultipleIntervalSegmentSpec(
Arrays.<Interval>asList( Arrays.<Interval>asList(
new Interval("2011-08-31/2011-09-01"), new Interval("2011-08-31/2011-09-01"),
new Interval("2012-08-31/2012-09-01"), new Interval("2012-08-31/2012-09-01"),
new Interval("2013-08-31/2013-09-01") new Interval("2013-08-31/2013-09-01")
) )
) )
) )
.context(ImmutableMap.<String, Object>of("priority", -1)) .context(ImmutableMap.<String, Object>of("priority", -1))
.build() .build()
).lhs; ).lhs;
Assert.assertEquals("hotBroker", brokerName); Assert.assertEquals("coldBroker", brokerName);
} }
@Test
public void testPrioritySelect2() throws Exception
{
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.intervals(
new MultipleIntervalSegmentSpec(
Arrays.<Interval>asList(
new Interval("2011-08-31/2011-09-01"),
new Interval("2012-08-31/2012-09-01"),
new Interval("2013-08-31/2013-09-01")
)
)
)
.context(ImmutableMap.<String, Object>of("priority", 5))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
}
private static class TestRuleManager extends CoordinatorRuleManager private static class TestRuleManager extends CoordinatorRuleManager
{ {