diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java new file mode 100644 index 00000000000..4fbcd78a602 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -0,0 +1,46 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import io.druid.query.Query; + +/** + */ +public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +{ + @Override + public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + final int priority = query.getContextPriority(0); + + if (priority < 0) { + return Optional.of( + Iterables.getFirst( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } + + return Optional.absent(); + } +} diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index 67ff109f217..395dd81e2e6 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -54,6 +54,10 @@ public class TieredBrokerConfig @NotNull private Period pollPeriod = new Period("PT1M"); + @JsonProperty + @NotNull + private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]"; + // tier, public LinkedHashMap getTierToBrokerMap() { @@ -88,4 +92,9 @@ public class TieredBrokerConfig { return pollPeriod; } + + public String getStrategies() + { + return strategies; + } } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 8ebe2e55050..a28e6ffc2ff 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -19,6 +19,7 @@ package io.druid.server.router; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.inject.Inject; @@ -30,7 +31,6 @@ import io.druid.client.selector.HostSelector; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.query.Query; -import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; @@ -50,6 +50,7 @@ public class TieredBrokerHostSelector implements HostSelector private final TieredBrokerConfig tierConfig; private final ServerDiscoveryFactory serverDiscoveryFactory; private final ConcurrentHashMap selectorMap = new ConcurrentHashMap<>(); + private final List strategies; private final Object lock = new Object(); @@ -59,12 +60,14 @@ public class TieredBrokerHostSelector implements HostSelector public TieredBrokerHostSelector( CoordinatorRuleManager ruleManager, TieredBrokerConfig tierConfig, - ServerDiscoveryFactory serverDiscoveryFactory + ServerDiscoveryFactory serverDiscoveryFactory, + List strategies ) { this.ruleManager = ruleManager; this.tierConfig = tierConfig; this.serverDiscoveryFactory = serverDiscoveryFactory; + this.strategies = strategies; } @LifecycleStart @@ -128,12 +131,12 @@ public class TieredBrokerHostSelector implements HostSelector String brokerServiceName = null; - // Somewhat janky way of always selecting highest priority broker for this type of query - if (query instanceof TimeBoundaryQuery) { - brokerServiceName = Iterables.getFirst( - tierConfig.getTierToBrokerMap().values(), - tierConfig.getDefaultBrokerServiceName() - ); + for (TieredBrokerSelectorStrategy strategy : strategies) { + final Optional optionalName = strategy.getBrokerServiceName(tierConfig, query); + if (optionalName.isPresent()) { + brokerServiceName = optionalName.get(); + break; + } } if (brokerServiceName == null) { diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java new file mode 100644 index 00000000000..3300261b7d4 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Lists; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.google.inject.Provider; + +import java.util.List; + +/** + */ +public class TieredBrokerSelectorStrategiesProvider implements Provider> +{ + private final List strategies = Lists.newArrayList(); + + @Inject + public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config) + { + try { + this.strategies.addAll( + (List) jsonMapper.readValue( + config.getStrategies(), new TypeReference>() + { + } + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public List get() + { + return strategies; + } +} diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java new file mode 100644 index 00000000000..bccf4d0d942 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java @@ -0,0 +1,19 @@ +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Optional; +import io.druid.query.Query; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class), + @JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class) +}) + +public interface TieredBrokerSelectorStrategy +{ + public Optional getBrokerServiceName(TieredBrokerConfig config, Query query); +} diff --git a/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java new file mode 100644 index 00000000000..135e87c1411 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java @@ -0,0 +1,46 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import io.druid.query.Query; +import io.druid.query.timeboundary.TimeBoundaryQuery; + +/** + */ +public class TimeBoundaryTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +{ + @Override + public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + // Somewhat janky way of always selecting highest priority broker for this type of query + if (query instanceof TimeBoundaryQuery) { + return Optional.of( + Iterables.getFirst( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } + + return Optional.absent(); + } +} diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 6d058d200fa..3d1d0cd0bb0 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -84,7 +84,8 @@ public class TieredBrokerHostSelectorTest return "hotBroker"; } }, - factory + factory, + Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy()) ); EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); EasyMock.replay(factory); @@ -196,6 +197,30 @@ public class TieredBrokerHostSelectorTest Assert.assertEquals("coldBroker", brokerName); } + @Test + public void testPrioritySelect() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", -1)) + .build() + ).lhs; + + Assert.assertEquals("hotBroker", brokerName); + } + + private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 681ae78fa27..c9a2f64fb7b 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.RoutingDruidClient; @@ -42,6 +43,8 @@ import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import io.druid.server.router.TieredBrokerConfig; import io.druid.server.router.TieredBrokerHostSelector; +import io.druid.server.router.TieredBrokerSelectorStrategiesProvider; +import io.druid.server.router.TieredBrokerSelectorStrategy; import org.eclipse.jetty.server.Server; import java.util.List; @@ -79,6 +82,10 @@ public class CliRouter extends ServerRunnable binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class); binder.bind(QueryHostFinder.class).in(LazySingleton.class); binder.bind(RoutingDruidClient.class).in(LazySingleton.class); + binder.bind(new TypeLiteral>(){}) + .toProvider(TieredBrokerSelectorStrategiesProvider.class) + .in(LazySingleton.class); + binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);