address code review comments

This commit is contained in:
fjy 2014-03-24 14:53:30 -07:00
parent fcd7522596
commit 75ec8bed9a
7 changed files with 166 additions and 92 deletions

View File

@ -19,34 +19,105 @@
package io.druid.server.router;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BrokerSelector<T>
{
private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
private final CoordinatorRuleManager ruleManager;
private final TierConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
private final Object lock = new Object();
private volatile boolean started = false;
@Inject
public BrokerSelector(CoordinatorRuleManager ruleManager, TierConfig tierConfig)
public BrokerSelector(
CoordinatorRuleManager ruleManager,
TierConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
this.ruleManager = ruleManager;
this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory;
}
public String select(final Query<T> query)
@LifecycleStart
public void start()
{
if (!ruleManager.isStarted()) {
return null;
synchronized (lock) {
if (started) {
return;
}
try {
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue());
selector.start();
selectorMap.put(entry.getValue(), selector);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
try {
for (ServerDiscoverySelector selector : selectorMap.values()) {
selector.stop();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
started = false;
}
}
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
{
synchronized (lock) {
if (!ruleManager.isStarted() || !started) {
return null;
}
}
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
@ -73,14 +144,35 @@ public class BrokerSelector<T>
}
// in the baseRule, find the broker of highest priority
String brokerName = null;
String brokerServiceName = null;
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
brokerName = entry.getValue();
brokerServiceName = entry.getValue();
break;
}
}
return brokerName;
if (brokerServiceName == null) {
log.makeAlert(
"WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
query.getDataSource(),
query.getIntervals(),
tierConfig.getDefaultBrokerServiceName()
).emit();
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
}
ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);
if (retVal == null) {
log.makeAlert(
"WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]",
brokerServiceName,
tierConfig.getDefaultBrokerServiceName()
).emit();
retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName());
}
return new Pair<>(brokerServiceName, retVal);
}
}

View File

@ -40,7 +40,6 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker
private final HttpClient httpClient;
private final BrokerSelector brokerSelector;
private final TierConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
@Inject
public RouterQuerySegmentWalker(
@ -48,8 +47,7 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker
ObjectMapper objectMapper,
@Global HttpClient httpClient,
BrokerSelector brokerSelector,
TierConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
TierConfig tierConfig
)
{
this.warehouse = warehouse;
@ -57,7 +55,6 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory;
}
@Override
@ -79,8 +76,7 @@ public class RouterQuerySegmentWalker implements QuerySegmentWalker
objectMapper,
httpClient,
brokerSelector,
tierConfig,
serverDiscoveryFactory
tierConfig
);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
@ -48,7 +49,6 @@ public class TierAwareQueryRunner<T> implements QueryRunner<T>
private final BrokerSelector<T> brokerSelector;
private final TierConfig tierConfig;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
public TierAwareQueryRunner(
@ -56,8 +56,7 @@ public class TierAwareQueryRunner<T> implements QueryRunner<T>
ObjectMapper objectMapper,
HttpClient httpClient,
BrokerSelector<T> brokerSelector,
TierConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
TierConfig tierConfig
)
{
this.warehouse = warehouse;
@ -65,54 +64,15 @@ public class TierAwareQueryRunner<T> implements QueryRunner<T>
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
try {
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue());
selector.start();
// TODO: stop?
selectorMap.put(entry.getValue(), selector);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public Server findServer(Query<T> query)
{
String brokerServiceName = brokerSelector.select(query);
if (brokerServiceName == null) {
log.makeAlert(
"WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
query.getDataSource(),
query.getIntervals(),
tierConfig.getDefaultBrokerServiceName()
).emit();
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
}
ServerDiscoverySelector selector = selectorMap.get(brokerServiceName);
Server server;
if (selector == null) {
log.makeAlert(
"WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]",
brokerServiceName,
tierConfig.getDefaultBrokerServiceName()
).emit();
selector = selectorMap.get(tierConfig.getDefaultBrokerServiceName());
if (selector != null) {
server = selector.pick();
} else {
return null;
}
} else {
server = selector.pick();
}
final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
final String brokerServiceName = selected.lhs;
final ServerDiscoverySelector selector = selected.rhs;
Server server = selector.pick();
if (server == null) {
log.error(
"WTF?! No server found for brokerServiceName[%s]. Using backup",

View File

@ -36,12 +36,7 @@ public class TierConfig
private String defaultBrokerServiceName = "";
@JsonProperty
@NotNull
private LinkedHashMap<String, String> tierToBrokerMap = new LinkedHashMap<String, String>(
ImmutableMap.of(
DruidServer.DEFAULT_TIER, defaultBrokerServiceName
)
);
private LinkedHashMap<String, String> tierToBrokerMap;
@JsonProperty
@NotNull
@ -62,7 +57,11 @@ public class TierConfig
// tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap()
{
return tierToBrokerMap;
return tierToBrokerMap == null ? new LinkedHashMap<>(
ImmutableMap.of(
DruidServer.DEFAULT_TIER, defaultBrokerServiceName
)
) : tierToBrokerMap;
}
public String getDefaultBrokerServiceName()

View File

@ -22,8 +22,10 @@ package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Pair;
import com.metamx.http.client.HttpClient;
import io.druid.client.DruidServer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
@ -36,7 +38,9 @@ import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -48,11 +52,16 @@ import java.util.List;
*/
public class BrokerSelectorTest
{
private ServerDiscoveryFactory factory;
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
@Before
public void setUp() throws Exception
{
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = new BrokerSelector(
new TestRuleManager(null, null, null, null),
new TierConfig()
@ -74,20 +83,41 @@ public class BrokerSelectorTest
{
return "hotBroker";
}
}
},
factory
);
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
selector.start();
EasyMock.expectLastCall().atLeastOnce();
selector.stop();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(selector);
brokerSelector.start();
}
@After
public void tearDown() throws Exception
{
brokerSelector.stop();
EasyMock.verify(selector);
EasyMock.verify(factory);
}
@Test
public void testBasicSelect() throws Exception
{
String brokerName = brokerSelector.select(
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
null
)
);
).lhs;
Assert.assertEquals("coldBroker", brokerName);
}
@ -96,13 +126,13 @@ public class BrokerSelectorTest
@Test
public void testBasicSelect2() throws Exception
{
String brokerName = brokerSelector.select(
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
null
)
);
).lhs;
Assert.assertEquals("hotBroker", brokerName);
}
@ -110,7 +140,7 @@ public class BrokerSelectorTest
@Test
public void testSelectMatchesNothing() throws Exception
{
String brokerName = brokerSelector.select(
Pair retVal = brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
@ -118,14 +148,14 @@ public class BrokerSelectorTest
)
);
Assert.assertEquals(null, brokerName);
Assert.assertEquals(null, retVal);
}
@Test
public void testSelectMultiInterval() throws Exception
{
String brokerName = brokerSelector.select(
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
@ -138,7 +168,7 @@ public class BrokerSelectorTest
)
)
).build()
);
).lhs;
Assert.assertEquals("coldBroker", brokerName);
}
@ -146,7 +176,7 @@ public class BrokerSelectorTest
@Test
public void testSelectMultiInterval2() throws Exception
{
String brokerName = brokerSelector.select(
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
@ -159,7 +189,7 @@ public class BrokerSelectorTest
)
)
).build()
);
).lhs;
Assert.assertEquals("coldBroker", brokerName);
}

View File

@ -20,10 +20,11 @@
package io.druid.server.router;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Pair;
import io.druid.client.DruidServer;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
@ -41,16 +42,16 @@ import java.util.LinkedHashMap;
*/
public class TierAwareQueryRunnerTest
{
private ServerDiscoveryFactory factory;
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
private TierConfig config;
private Server server;
@Before
public void setUp() throws Exception
{
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = EasyMock.createMock(BrokerSelector.class);
config = new TierConfig()
{
@ -104,29 +105,25 @@ public class TierAwareQueryRunnerTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(brokerSelector);
EasyMock.verify(selector);
EasyMock.verify(factory);
}
@Test
public void testFindServer() throws Exception
{
selector.start();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(brokerSelector.select(EasyMock.<Query>anyObject())).andReturn(new Pair("hotBroker", selector));
EasyMock.replay(brokerSelector);
EasyMock.expect(selector.pick()).andReturn(server).once();
EasyMock.replay(selector);
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
null,
null,
null,
new BrokerSelector(new CoordinatorRuleManager(null, null, null, null), config),
config,
factory
brokerSelector,
config
);
Server server = queryRunner.findServer(

View File

@ -71,14 +71,14 @@ public class CliRouter extends ServerRunnable
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.broker", TierConfig.class);
JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(BrokerSelector.class).in(LazySingleton.class);
binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);