allow router to override selection based on config

This commit is contained in:
fjy 2014-07-22 10:12:02 -07:00
parent a1ea56ad12
commit 8f2fc595ee
8 changed files with 223 additions and 9 deletions

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
/**
*/
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);
if (priority < 0) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}
return Optional.absent();
}
}

View File

@ -54,6 +54,10 @@ public class TieredBrokerConfig
@NotNull @NotNull
private Period pollPeriod = new Period("PT1M"); private Period pollPeriod = new Period("PT1M");
@JsonProperty
@NotNull
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";
// tier, <bard, numThreads> // tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap() public LinkedHashMap<String, String> getTierToBrokerMap()
{ {
@ -88,4 +92,9 @@ public class TieredBrokerConfig
{ {
return pollPeriod; return pollPeriod;
} }
public String getStrategies()
{
return strategies;
}
} }

View File

@ -19,6 +19,7 @@
package io.druid.server.router; package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -30,7 +31,6 @@ import io.druid.client.selector.HostSelector;
import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -50,6 +50,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
private final TieredBrokerConfig tierConfig; private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory; private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final List<TieredBrokerSelectorStrategy> strategies;
private final Object lock = new Object(); private final Object lock = new Object();
@ -59,12 +60,14 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
public TieredBrokerHostSelector( public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager, CoordinatorRuleManager ruleManager,
TieredBrokerConfig tierConfig, TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory ServerDiscoveryFactory serverDiscoveryFactory,
List<TieredBrokerSelectorStrategy> strategies
) )
{ {
this.ruleManager = ruleManager; this.ruleManager = ruleManager;
this.tierConfig = tierConfig; this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory; this.serverDiscoveryFactory = serverDiscoveryFactory;
this.strategies = strategies;
} }
@LifecycleStart @LifecycleStart
@ -128,12 +131,12 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
String brokerServiceName = null; String brokerServiceName = null;
// Somewhat janky way of always selecting highest priority broker for this type of query for (TieredBrokerSelectorStrategy strategy : strategies) {
if (query instanceof TimeBoundaryQuery) { final Optional<String> optionalName = strategy.getBrokerServiceName(tierConfig, query);
brokerServiceName = Iterables.getFirst( if (optionalName.isPresent()) {
tierConfig.getTierToBrokerMap().values(), brokerServiceName = optionalName.get();
tierConfig.getDefaultBrokerServiceName() break;
); }
} }
if (brokerServiceName == null) { if (brokerServiceName == null) {

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.List;
/**
*/
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
{
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList();
@Inject
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
{
try {
this.strategies.addAll(
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public List<TieredBrokerSelectorStrategy> get()
{
return strategies;
}
}

View File

@ -0,0 +1,19 @@
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import io.druid.query.Query;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class)
})
public interface TieredBrokerSelectorStrategy
{
public Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query query);
}

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
/**
*/
public class TimeBoundaryTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}
return Optional.absent();
}
}

View File

@ -84,7 +84,8 @@ public class TieredBrokerHostSelectorTest
return "hotBroker"; return "hotBroker";
} }
}, },
factory factory,
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy())
); );
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce(); EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory); EasyMock.replay(factory);
@ -196,6 +197,30 @@ public class TieredBrokerHostSelectorTest
Assert.assertEquals("coldBroker", brokerName); Assert.assertEquals("coldBroker", brokerName);
} }
@Test
public void testPrioritySelect() throws Exception
{
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.intervals(
new MultipleIntervalSegmentSpec(
Arrays.<Interval>asList(
new Interval("2011-08-31/2011-09-01"),
new Interval("2012-08-31/2012-09-01"),
new Interval("2013-08-31/2013-09-01")
)
)
)
.context(ImmutableMap.<String, Object>of("priority", -1))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
}
private static class TestRuleManager extends CoordinatorRuleManager private static class TestRuleManager extends CoordinatorRuleManager
{ {
public TestRuleManager( public TestRuleManager(

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.client.RoutingDruidClient; import io.druid.client.RoutingDruidClient;
@ -42,6 +43,8 @@ import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router; import io.druid.server.router.Router;
import io.druid.server.router.TieredBrokerConfig; import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector; import io.druid.server.router.TieredBrokerHostSelector;
import io.druid.server.router.TieredBrokerSelectorStrategiesProvider;
import io.druid.server.router.TieredBrokerSelectorStrategy;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.List; import java.util.List;
@ -79,6 +82,10 @@ public class CliRouter extends ServerRunnable
binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class); binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
binder.bind(QueryHostFinder.class).in(LazySingleton.class); binder.bind(QueryHostFinder.class).in(LazySingleton.class);
binder.bind(RoutingDruidClient.class).in(LazySingleton.class); binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>(){})
.toProvider(TieredBrokerSelectorStrategiesProvider.class)
.in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);