diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 4fbcd78a602..d911aef7d35 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -19,6 +19,8 @@ package io.druid.server.router; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import io.druid.query.Query; @@ -27,12 +29,32 @@ import io.druid.query.Query; */ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy { + private final int minPriority; + private final int maxPriority; + + @JsonCreator + public PriorityTieredBrokerSelectorStrategy( + @JsonProperty("minPriority") Integer minPriority, + @JsonProperty("maxPriority") Integer maxPriority + ) + { + this.minPriority = minPriority == null ? 0 : minPriority; + this.maxPriority = maxPriority == null ? 1 : maxPriority; + } + @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { final int priority = query.getContextPriority(0); - if (priority < 0) { + if (priority < minPriority) { + return Optional.of( + Iterables.getLast( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } else if (priority >= maxPriority) { return Optional.of( Iterables.getFirst( tierConfig.getTierToBrokerMap().values(), diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index 395dd81e2e6..283b9d23e7f 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -25,7 +25,9 @@ import io.druid.client.DruidServer; import org.joda.time.Period; import javax.validation.constraints.NotNull; +import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; /** */ @@ -56,7 +58,10 @@ public class TieredBrokerConfig @JsonProperty @NotNull - private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]"; + private List strategies = Arrays.asList( + new TimeBoundaryTieredBrokerSelectorStrategy(), + new PriorityTieredBrokerSelectorStrategy(0, 0) + ); // tier, public LinkedHashMap getTierToBrokerMap() @@ -93,7 +98,7 @@ public class TieredBrokerConfig return pollPeriod; } - public String getStrategies() + public List getStrategies() { return strategies; } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java index 3300261b7d4..7e3ea6ae472 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java @@ -19,8 +19,6 @@ package io.druid.server.router; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.util.Lists; import com.google.common.base.Throwables; import com.google.inject.Inject; @@ -35,16 +33,10 @@ public class TieredBrokerSelectorStrategiesProvider implements Provider strategies = Lists.newArrayList(); @Inject - public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config) + public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config) { try { - this.strategies.addAll( - (List) jsonMapper.readValue( - config.getStrategies(), new TypeReference>() - { - } - ) - ); + this.strategies.addAll(config.getStrategies()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 3d1d0cd0bb0..5c2320f9885 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -22,7 +22,6 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; -import com.metamx.common.Pair; import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; import io.druid.curator.discovery.ServerDiscoveryFactory; @@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.query.Druids; -import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; import junit.framework.Assert; @@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest } }, factory, - Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy()) + Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) ); EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); EasyMock.replay(factory); @@ -198,28 +195,50 @@ public class TieredBrokerHostSelectorTest } @Test - public void testPrioritySelect() throws Exception - { - String brokerName = (String) brokerSelector.select( - Druids.newTimeseriesQueryBuilder() - .dataSource("test") - .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) - .intervals( - new MultipleIntervalSegmentSpec( - Arrays.asList( - new Interval("2011-08-31/2011-09-01"), - new Interval("2012-08-31/2012-09-01"), - new Interval("2013-08-31/2013-09-01") - ) - ) - ) - .context(ImmutableMap.of("priority", -1)) - .build() - ).lhs; + public void testPrioritySelect() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", -1)) + .build() + ).lhs; - Assert.assertEquals("hotBroker", brokerName); - } + Assert.assertEquals("coldBroker", brokerName); + } + @Test + public void testPrioritySelect2() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", 5)) + .build() + ).lhs; + + Assert.assertEquals("hotBroker", brokerName); + } private static class TestRuleManager extends CoordinatorRuleManager {