diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManager.java b/server/src/main/java/io/druid/db/DatabaseRuleManager.java index 9a1c014b36f..5882c964346 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManager.java @@ -159,7 +159,7 @@ public class DatabaseRuleManager this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); - createDefaultRule(dbi, getRulesTable(), config.get().getDefaultTier(), jsonMapper); + createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper); ScheduledExecutors.scheduleWithFixedDelay( exec, new Duration(0), @@ -274,8 +274,8 @@ public class DatabaseRuleManager if (theRules.get(dataSource) != null) { retVal.addAll(theRules.get(dataSource)); } - if (theRules.get(config.get().getDefaultTier()) != null) { - retVal.addAll(theRules.get(config.get().getDefaultTier())); + if (theRules.get(config.get().getDefaultRule()) != null) { + retVal.addAll(theRules.get(config.get().getDefaultRule())); } return retVal; } diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java b/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java index 8b770a57955..63f506649b5 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManagerConfig.java @@ -27,14 +27,14 @@ import org.joda.time.Period; public class DatabaseRuleManagerConfig { @JsonProperty - private String defaultTier = "_default"; + private String defaultRule = "_default"; @JsonProperty private Period pollDuration = new Period("PT1M"); - public String getDefaultTier() + public String getDefaultRule() { - return defaultTier; + return defaultRule; } public Period getPollDuration() diff --git a/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java b/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java index dfc25bea283..0296f25867e 100644 --- a/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java +++ b/server/src/main/java/io/druid/db/DatabaseRuleManagerProvider.java @@ -64,7 +64,7 @@ public class DatabaseRuleManagerProvider implements Provider +{ + private final CoordinatorRuleManager ruleManager; + private final TierConfig tierConfig; + + @Inject + public BrokerSelector(CoordinatorRuleManager ruleManager, TierConfig tierConfig) + { + this.ruleManager = ruleManager; + this.tierConfig = tierConfig; + } + + public String select(final Query query) + { + if (!ruleManager.isStarted()) { + return null; + } + + List rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName()); + + // find the rule that can apply to the entire set of intervals + DateTime now = new DateTime(); + int lastRulePosition = -1; + LoadRule baseRule = null; + + for (Interval interval : query.getIntervals()) { + int currRulePosition = 0; + for (Rule rule : rules) { + if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) { + lastRulePosition = currRulePosition; + baseRule = (LoadRule) rule; + break; + } + currRulePosition++; + } + } + + if (baseRule == null) { + return null; + } + + // in the baseRule, find the broker of highest priority + String brokerName = null; + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + if (baseRule.getTieredReplicants().containsKey(entry.getKey())) { + brokerName = entry.getValue(); + break; + } + } + + return brokerName; + } +} diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java new file mode 100644 index 00000000000..9acf5f94858 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -0,0 +1,193 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Charsets; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.client.selector.Server; +import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.server.coordinator.rules.Rule; +import org.joda.time.Duration; + +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +@ManageLifecycle +public class CoordinatorRuleManager +{ + private static final Logger log = new Logger(CoordinatorRuleManager.class); + + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + private final Supplier config; + private final ServerDiscoverySelector selector; + + private final StatusResponseHandler responseHandler; + private final AtomicReference>> rules; + + private volatile ScheduledExecutorService exec; + + private final Object lock = new Object(); + + private volatile boolean started = false; + + @Inject + public CoordinatorRuleManager( + @Global HttpClient httpClient, + @Json ObjectMapper jsonMapper, + Supplier config, + ServerDiscoverySelector selector + ) + { + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + this.config = config; + this.selector = selector; + + this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); + this.rules = new AtomicReference<>( + new ConcurrentHashMap>() + ); + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + this.exec = Execs.scheduledSingleThreaded("CoordinatorRuleManager-Exec--%d"); + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.get().getPollPeriod().toStandardDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + rules.set(new ConcurrentHashMap>()); + + started = false; + exec.shutdownNow(); + exec = null; + } + } + + public boolean isStarted() + { + return started; + } + + public void poll() + { + try { + String url = getRuleURL(); + if (url == null) { + return; + } + + StatusResponseHolder response = httpClient.get(new URL(url)) + .go(responseHandler) + .get(); + + ConcurrentHashMap> newRules = new ConcurrentHashMap>( + (Map>) jsonMapper.readValue( + response.getContent(), new TypeReference>>() + { + } + ) + ); + + log.info("Got [%,d] rules", newRules.keySet().size()); + + rules.set(newRules); + } + catch (Exception e) { + log.error(e, "Exception while polling for rules"); + } + } + + public List getRulesWithDefault(final String dataSource) + { + List retVal = Lists.newArrayList(); + Map> theRules = rules.get(); + if (theRules.get(dataSource) != null) { + retVal.addAll(theRules.get(dataSource)); + } + if (theRules.get(config.get().getDefaultRule()) != null) { + retVal.addAll(theRules.get(config.get().getDefaultRule())); + } + return retVal; + } + + private String getRuleURL() + { + Server server = selector.pick(); + + if (server == null) { + log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName()); + return null; + } + + return String.format("http://%s%s", server.getHost(), config.get().getRulesEndpoint()); + } +} diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java new file mode 100644 index 00000000000..705e12fb8d6 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java @@ -0,0 +1,86 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.metamx.http.client.HttpClient; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.guice.annotations.Global; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.SegmentDescriptor; +import org.joda.time.Interval; + +/** + */ +public class RouterQuerySegmentWalker implements QuerySegmentWalker +{ + private final QueryToolChestWarehouse warehouse; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + private final BrokerSelector brokerSelector; + private final TierConfig tierConfig; + private final ServerDiscoveryFactory serverDiscoveryFactory; + + @Inject + public RouterQuerySegmentWalker( + QueryToolChestWarehouse warehouse, + ObjectMapper objectMapper, + @Global HttpClient httpClient, + BrokerSelector brokerSelector, + TierConfig tierConfig, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + this.warehouse = warehouse; + this.objectMapper = objectMapper; + this.httpClient = httpClient; + this.brokerSelector = brokerSelector; + this.tierConfig = tierConfig; + this.serverDiscoveryFactory = serverDiscoveryFactory; + } + + @Override + public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) + { + return makeRunner(); + } + + @Override + public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) + { + return makeRunner(); + } + + private QueryRunner makeRunner() + { + return new TierAwareQueryRunner( + warehouse, + objectMapper, + httpClient, + brokerSelector, + tierConfig, + serverDiscoveryFactory + ); + } +} diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java new file mode 100644 index 00000000000..5acb1b6dd8c --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java @@ -0,0 +1,163 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.HttpClient; +import io.druid.client.DirectDruidClient; +import io.druid.client.selector.Server; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChestWarehouse; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class TierAwareQueryRunner implements QueryRunner +{ + private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class); + + private final QueryToolChestWarehouse warehouse; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + private final BrokerSelector brokerSelector; + private final TierConfig tierConfig; + + private final ConcurrentHashMap selectorMap = new ConcurrentHashMap(); + private final ConcurrentHashMap serverBackup = new ConcurrentHashMap(); + + public TierAwareQueryRunner( + QueryToolChestWarehouse warehouse, + ObjectMapper objectMapper, + HttpClient httpClient, + BrokerSelector brokerSelector, + TierConfig tierConfig, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + this.warehouse = warehouse; + this.objectMapper = objectMapper; + this.httpClient = httpClient; + this.brokerSelector = brokerSelector; + this.tierConfig = tierConfig; + + try { + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); + selector.start(); + // TODO: stop? + selectorMap.put(entry.getValue(), selector); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public Server findServer(Query query) + { + String brokerServiceName = brokerSelector.select(query); + + if (brokerServiceName == null) { + log.error( + "WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].", + query.getDataSource(), + query.getIntervals(), + tierConfig.getDefaultBrokerServiceName() + ); + brokerServiceName = tierConfig.getDefaultBrokerServiceName(); + } + + ServerDiscoverySelector selector = selectorMap.get(brokerServiceName); + + Server server; + if (selector == null) { + log.error( + "WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", + brokerServiceName, + tierConfig.getDefaultBrokerServiceName() + ); + selector = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); + + if (selector != null) { + server = selector.pick(); + } else { + return null; + } + } else { + server = selector.pick(); + } + + if (server == null) { + log.error( + "WTF?! No server found for brokerServiceName[%s]. Using backup", + brokerServiceName + ); + + server = serverBackup.get(brokerServiceName); + + if (server == null) { + log.error( + "WTF?! No backup found for brokerServiceName[%s]. Using default[%s]", + brokerServiceName, + tierConfig.getDefaultBrokerServiceName() + ); + + server = serverBackup.get(tierConfig.getDefaultBrokerServiceName()); + } + } else { + serverBackup.put(brokerServiceName, server); + } + + return server; + } + + @Override + public Sequence run(Query query) + { + Server server = findServer(query); + + if (server == null) { + log.makeAlert( + "Catastrophic failure! No servers found for default broker [%s]!", + tierConfig.getDefaultBrokerServiceName() + ).emit(); + + return Sequences.empty(); + } + + QueryRunner client = new DirectDruidClient( + warehouse, + objectMapper, + httpClient, + server.getHost() + ); + + return client.run(query); + } +} diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TierConfig.java new file mode 100644 index 00000000000..8fba0c6255d --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TierConfig.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import io.druid.client.DruidServer; +import org.joda.time.Period; + +import javax.validation.constraints.NotNull; +import java.util.LinkedHashMap; + +/** + */ +public class TierConfig +{ + @JsonProperty + @NotNull + private String defaultBrokerServiceName = ""; + + @JsonProperty + @NotNull + private LinkedHashMap tierToBrokerMap = new LinkedHashMap( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, defaultBrokerServiceName + ) + ); + + @JsonProperty + @NotNull + private String defaultRule = "_default"; + + @JsonProperty + @NotNull + private String rulesEndpoint = "/druid/coordinator/v1/rules"; + + @JsonProperty + @NotNull + private String coordinatorServiceName = null; + + @JsonProperty + @NotNull + private Period pollPeriod = new Period("PT1M"); + + // tier, + public LinkedHashMap getTierToBrokerMap() + { + return tierToBrokerMap; + } + + public String getDefaultBrokerServiceName() + { + return defaultBrokerServiceName; + } + + public String getDefaultRule() + { + return defaultRule; + } + + public String getRulesEndpoint() + { + return rulesEndpoint; + } + + public String getCoordinatorServiceName() + { + return coordinatorServiceName; + } + + public Period getPollPeriod() + { + return pollPeriod; + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 3e75d1b79c3..24542e70df2 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -122,6 +122,12 @@ public class LoadRuleTest { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } }; DruidCluster druidCluster = new DruidCluster( @@ -214,6 +220,12 @@ public class LoadRuleTest { return true; } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } }; DruidServer server1 = new DruidServer( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java new file mode 100644 index 00000000000..7a29594e306 --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -0,0 +1,105 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.logger.Logger; +import io.airlift.command.Command; +import io.druid.curator.discovery.DiscoveryModule; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; +import io.druid.query.MapQueryToolChestWarehouse; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.QueryResource; +import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.router.BrokerSelector; +import io.druid.server.router.CoordinatorRuleManager; +import io.druid.server.router.RouterQuerySegmentWalker; +import io.druid.server.router.TierConfig; +import org.eclipse.jetty.server.Server; + +import java.util.List; + +/** + */ +@Command( + name = "router", + description = "Experimental! Understands tiers and routes things to different brokers" +) +public class CliRouter extends ServerRunnable +{ + private static final Logger log = new Logger(CliRouter.class); + + public CliRouter() + { + super(log); + } + + @Override + protected List getModules() + { + return ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.broker", TierConfig.class); + + binder.bind(CoordinatorRuleManager.class); + LifecycleModule.register(binder, CoordinatorRuleManager.class); + + binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + + binder.bind(BrokerSelector.class).in(LazySingleton.class); + binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class); + + binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); + Jerseys.addResource(binder, QueryResource.class); + LifecycleModule.register(binder, QueryResource.class); + + LifecycleModule.register(binder, Server.class); + DiscoveryModule.register(binder, Self.class); + } + + @Provides + @ManageLifecycle + public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( + TierConfig config, + ServerDiscoveryFactory factory + + ) + { + return factory.createSelector(config.getCoordinatorServiceName()); + } + } + ); + } +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 7012e3a3f79..450aa36afe9 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -52,7 +52,7 @@ public class Main .withCommands( CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class, - CliBridge.class + CliBridge.class, CliRouter.class ); builder.withGroup("example")