diff --git a/server/src/main/java/io/druid/server/router/BrokerSelector.java b/server/src/main/java/io/druid/server/router/BrokerSelector.java index 10600ad43e1..e27acf09c10 100644 --- a/server/src/main/java/io/druid/server/router/BrokerSelector.java +++ b/server/src/main/java/io/druid/server/router/BrokerSelector.java @@ -19,34 +19,105 @@ package io.druid.server.router; +import com.google.common.base.Throwables; import com.google.inject.Inject; +import com.metamx.common.Pair; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.query.Query; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** */ public class BrokerSelector { + private static EmittingLogger log = new EmittingLogger(BrokerSelector.class); + private final CoordinatorRuleManager ruleManager; private final TierConfig tierConfig; + private final ServerDiscoveryFactory serverDiscoveryFactory; + private final ConcurrentHashMap selectorMap = new ConcurrentHashMap(); + + private final Object lock = new Object(); + + private volatile boolean started = false; @Inject - public BrokerSelector(CoordinatorRuleManager ruleManager, TierConfig tierConfig) + public BrokerSelector( + CoordinatorRuleManager ruleManager, + TierConfig tierConfig, + ServerDiscoveryFactory serverDiscoveryFactory + ) { this.ruleManager = ruleManager; this.tierConfig = tierConfig; + this.serverDiscoveryFactory = serverDiscoveryFactory; } - public String select(final Query query) + @LifecycleStart + public void start() { - if (!ruleManager.isStarted()) { - return null; + synchronized (lock) { + if (started) { + return; + } + + try { + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); + selector.start(); + selectorMap.put(entry.getValue(), selector); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + started = true; + } + } + + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + try { + for (ServerDiscoverySelector selector : selectorMap.values()) { + selector.stop(); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + started = false; + } + } + + public Pair select(final Query query) + { + synchronized (lock) { + if (!ruleManager.isStarted() || !started) { + return null; + } } List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); @@ -73,14 +144,35 @@ public class BrokerSelector } // in the baseRule, find the broker of highest priority - String brokerName = null; + String brokerServiceName = null; for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { if (baseRule.getTieredReplicants().containsKey(entry.getKey())) { - brokerName = entry.getValue(); + brokerServiceName = entry.getValue(); break; } } - return brokerName; + if (brokerServiceName == null) { + log.makeAlert( + "WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].", + query.getDataSource(), + query.getIntervals(), + tierConfig.getDefaultBrokerServiceName() + ).emit(); + brokerServiceName = tierConfig.getDefaultBrokerServiceName(); + } + + ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); + + if (retVal == null) { + log.makeAlert( + "WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", + brokerServiceName, + tierConfig.getDefaultBrokerServiceName() + ).emit(); + retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); + } + + return new Pair<>(brokerServiceName, retVal); } } diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java index 705e12fb8d6..92bf43135fe 100644 --- a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java @@ -40,7 +40,6 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker private final HttpClient httpClient; private final BrokerSelector brokerSelector; private final TierConfig tierConfig; - private final ServerDiscoveryFactory serverDiscoveryFactory; @Inject public RouterQuerySegmentWalker( @@ -48,8 +47,7 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker ObjectMapper objectMapper, @Global HttpClient httpClient, BrokerSelector brokerSelector, - TierConfig tierConfig, - ServerDiscoveryFactory serverDiscoveryFactory + TierConfig tierConfig ) { this.warehouse = warehouse; @@ -57,7 +55,6 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker this.httpClient = httpClient; this.brokerSelector = brokerSelector; this.tierConfig = tierConfig; - this.serverDiscoveryFactory = serverDiscoveryFactory; } @Override @@ -79,8 +76,7 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker objectMapper, httpClient, brokerSelector, - tierConfig, - serverDiscoveryFactory + tierConfig ); } } diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java index 381d3c2e82d..3a098e0faed 100644 --- a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java +++ b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java @@ -21,6 +21,7 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.metamx.common.Pair; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.EmittingLogger; @@ -48,7 +49,6 @@ public class TierAwareQueryRunner implements QueryRunner private final BrokerSelector brokerSelector; private final TierConfig tierConfig; - private final ConcurrentHashMap selectorMap = new ConcurrentHashMap(); private final ConcurrentHashMap serverBackup = new ConcurrentHashMap(); public TierAwareQueryRunner( @@ -56,8 +56,7 @@ public class TierAwareQueryRunner implements QueryRunner ObjectMapper objectMapper, HttpClient httpClient, BrokerSelector brokerSelector, - TierConfig tierConfig, - ServerDiscoveryFactory serverDiscoveryFactory + TierConfig tierConfig ) { this.warehouse = warehouse; @@ -65,54 +64,15 @@ public class TierAwareQueryRunner implements QueryRunner this.httpClient = httpClient; this.brokerSelector = brokerSelector; this.tierConfig = tierConfig; - - try { - for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { - ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); - selector.start(); - // TODO: stop? - selectorMap.put(entry.getValue(), selector); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } } public Server findServer(Query query) { - String brokerServiceName = brokerSelector.select(query); - - if (brokerServiceName == null) { - log.makeAlert( - "WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].", - query.getDataSource(), - query.getIntervals(), - tierConfig.getDefaultBrokerServiceName() - ).emit(); - brokerServiceName = tierConfig.getDefaultBrokerServiceName(); - } - - ServerDiscoverySelector selector = selectorMap.get(brokerServiceName); - - Server server; - if (selector == null) { - log.makeAlert( - "WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", - brokerServiceName, - tierConfig.getDefaultBrokerServiceName() - ).emit(); - selector = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); - - if (selector != null) { - server = selector.pick(); - } else { - return null; - } - } else { - server = selector.pick(); - } + final Pair selected = brokerSelector.select(query); + final String brokerServiceName = selected.lhs; + final ServerDiscoverySelector selector = selected.rhs; + Server server = selector.pick(); if (server == null) { log.error( "WTF?! No server found for brokerServiceName[%s]. Using backup", diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TierConfig.java index 8fba0c6255d..c819edfce23 100644 --- a/server/src/main/java/io/druid/server/router/TierConfig.java +++ b/server/src/main/java/io/druid/server/router/TierConfig.java @@ -36,12 +36,7 @@ public class TierConfig private String defaultBrokerServiceName = ""; @JsonProperty - @NotNull - private LinkedHashMap tierToBrokerMap = new LinkedHashMap( - ImmutableMap.of( - DruidServer.DEFAULT_TIER, defaultBrokerServiceName - ) - ); + private LinkedHashMap tierToBrokerMap; @JsonProperty @NotNull @@ -62,7 +57,11 @@ public class TierConfig // tier, public LinkedHashMap getTierToBrokerMap() { - return tierToBrokerMap; + return tierToBrokerMap == null ? new LinkedHashMap<>( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, defaultBrokerServiceName + ) + ) : tierToBrokerMap; } public String getDefaultBrokerServiceName() diff --git a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java b/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java index 4a470d75d51..55e5dda44ea 100644 --- a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java @@ -22,8 +22,10 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import com.metamx.common.Pair; import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; +import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; @@ -36,7 +38,9 @@ import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; import junit.framework.Assert; +import org.easymock.EasyMock; import org.joda.time.Interval; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -48,11 +52,16 @@ import java.util.List; */ public class BrokerSelectorTest { + private ServerDiscoveryFactory factory; + private ServerDiscoverySelector selector; private BrokerSelector brokerSelector; @Before public void setUp() throws Exception { + factory = EasyMock.createMock(ServerDiscoveryFactory.class); + selector = EasyMock.createMock(ServerDiscoverySelector.class); + brokerSelector = new BrokerSelector( new TestRuleManager(null, null, null, null), new TierConfig() @@ -74,20 +83,41 @@ public class BrokerSelectorTest { return "hotBroker"; } - } + }, + factory ); + EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); + EasyMock.replay(factory); + + selector.start(); + EasyMock.expectLastCall().atLeastOnce(); + selector.stop(); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(selector); + + brokerSelector.start(); + + } + + @After + public void tearDown() throws Exception + { + brokerSelector.stop(); + + EasyMock.verify(selector); + EasyMock.verify(factory); } @Test public void testBasicSelect() throws Exception { - String brokerName = brokerSelector.select( + String brokerName = (String) brokerSelector.select( new TimeBoundaryQuery( new TableDataSource("test"), new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2011-08-31/2011-09-01"))), null ) - ); + ).lhs; Assert.assertEquals("coldBroker", brokerName); } @@ -96,13 +126,13 @@ public class BrokerSelectorTest @Test public void testBasicSelect2() throws Exception { - String brokerName = brokerSelector.select( + String brokerName = (String) brokerSelector.select( new TimeBoundaryQuery( new TableDataSource("test"), new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2013-08-31/2013-09-01"))), null ) - ); + ).lhs; Assert.assertEquals("hotBroker", brokerName); } @@ -110,7 +140,7 @@ public class BrokerSelectorTest @Test public void testSelectMatchesNothing() throws Exception { - String brokerName = brokerSelector.select( + Pair retVal = brokerSelector.select( new TimeBoundaryQuery( new TableDataSource("test"), new MultipleIntervalSegmentSpec(Arrays.asList(new Interval("2010-08-31/2010-09-01"))), @@ -118,14 +148,14 @@ public class BrokerSelectorTest ) ); - Assert.assertEquals(null, brokerName); + Assert.assertEquals(null, retVal); } @Test public void testSelectMultiInterval() throws Exception { - String brokerName = brokerSelector.select( + String brokerName = (String) brokerSelector.select( Druids.newTimeseriesQueryBuilder() .dataSource("test") .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) @@ -138,7 +168,7 @@ public class BrokerSelectorTest ) ) ).build() - ); + ).lhs; Assert.assertEquals("coldBroker", brokerName); } @@ -146,7 +176,7 @@ public class BrokerSelectorTest @Test public void testSelectMultiInterval2() throws Exception { - String brokerName = brokerSelector.select( + String brokerName = (String) brokerSelector.select( Druids.newTimeseriesQueryBuilder() .dataSource("test") .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) @@ -159,7 +189,7 @@ public class BrokerSelectorTest ) ) ).build() - ); + ).lhs; Assert.assertEquals("coldBroker", brokerName); } diff --git a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java b/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java index f7301229c46..d788daa2f2b 100644 --- a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java +++ b/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java @@ -20,10 +20,11 @@ package io.druid.server.router; import com.google.common.collect.ImmutableMap; +import com.metamx.common.Pair; import io.druid.client.DruidServer; import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; import io.druid.query.TableDataSource; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; @@ -41,16 +42,16 @@ import java.util.LinkedHashMap; */ public class TierAwareQueryRunnerTest { - private ServerDiscoveryFactory factory; private ServerDiscoverySelector selector; + private BrokerSelector brokerSelector; private TierConfig config; private Server server; @Before public void setUp() throws Exception { - factory = EasyMock.createMock(ServerDiscoveryFactory.class); selector = EasyMock.createMock(ServerDiscoverySelector.class); + brokerSelector = EasyMock.createMock(BrokerSelector.class); config = new TierConfig() { @@ -104,29 +105,25 @@ public class TierAwareQueryRunnerTest @After public void tearDown() throws Exception { + EasyMock.verify(brokerSelector); EasyMock.verify(selector); - EasyMock.verify(factory); } @Test public void testFindServer() throws Exception { - selector.start(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(brokerSelector.select(EasyMock.anyObject())).andReturn(new Pair("hotBroker", selector)); + EasyMock.replay(brokerSelector); + EasyMock.expect(selector.pick()).andReturn(server).once(); EasyMock.replay(selector); - EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); - EasyMock.replay(factory); - - TierAwareQueryRunner queryRunner = new TierAwareQueryRunner( null, null, null, - new BrokerSelector(new CoordinatorRuleManager(null, null, null, null), config), - config, - factory + brokerSelector, + config ); Server server = queryRunner.findServer( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 7a29594e306..740edd8b55b 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -71,14 +71,14 @@ public class CliRouter extends ServerRunnable @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.broker", TierConfig.class); + JsonConfigProvider.bind(binder, "druid.router", TierConfig.class); binder.bind(CoordinatorRuleManager.class); LifecycleModule.register(binder, CoordinatorRuleManager.class); binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); - binder.bind(BrokerSelector.class).in(LazySingleton.class); + binder.bind(BrokerSelector.class).in(ManageLifecycle.class); binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);