Merge pull request #518 from metamx/server-select

Smarter server selection at the broker level
This commit is contained in:
fjy 2014-05-05 17:09:15 -06:00
commit 42c2fcb6b4
14 changed files with 339 additions and 75 deletions

View File

@ -27,7 +27,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Client;
import io.druid.query.DataSource; import io.druid.query.DataSource;
@ -61,7 +61,7 @@ public class BrokerServerView implements TimelineServerView
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
private final HttpClient httpClient; private final HttpClient httpClient;
private final ServerView baseView; private final ServerView baseView;
private final ServerSelectorStrategy serverSelectorStrategy; private final TierSelectorStrategy tierSelectorStrategy;
@Inject @Inject
public BrokerServerView( public BrokerServerView(
@ -69,14 +69,14 @@ public class BrokerServerView implements TimelineServerView
ObjectMapper smileMapper, ObjectMapper smileMapper,
@Client HttpClient httpClient, @Client HttpClient httpClient,
ServerView baseView, ServerView baseView,
ServerSelectorStrategy serverSelectorStrategy TierSelectorStrategy tierSelectorStrategy
) )
{ {
this.warehouse = warehouse; this.warehouse = warehouse;
this.smileMapper = smileMapper; this.smileMapper = smileMapper;
this.httpClient = httpClient; this.httpClient = httpClient;
this.baseView = baseView; this.baseView = baseView;
this.serverSelectorStrategy = serverSelectorStrategy; this.tierSelectorStrategy = tierSelectorStrategy;
this.clients = Maps.newConcurrentMap(); this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap(); this.selectors = Maps.newHashMap();
@ -171,7 +171,7 @@ public class BrokerServerView implements TimelineServerView
ServerSelector selector = selectors.get(segmentId); ServerSelector selector = selectors.get(segmentId);
if (selector == null) { if (selector == null) {
selector = new ServerSelector(segment, serverSelectorStrategy); selector = new ServerSelector(segment, tierSelectorStrategy);
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource()); VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) { if (timeline == null) {

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
/**
*/
public abstract class AbstractTierSelectorStrategy implements TierSelectorStrategy
{
private final ServerSelectorStrategy serverSelectorStrategy;
public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
{
this.serverSelectorStrategy = serverSelectorStrategy;
}
@Override
public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
)
{
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = prioritizedServers.pollFirstEntry();
if (priorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = priorityServers.getValue();
switch (servers.size()) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return priorityServers.getValue().iterator().next();
default:
return serverSelectorStrategy.pick(servers, segment);
}
}
}

View File

@ -20,14 +20,11 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{ {
@ -41,25 +38,8 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
}; };
@Override @Override
public QueryableDruidServer pick( public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
)
{ {
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Collections.min(servers, comparator); return Collections.min(servers, comparator);
} }
} }
}

View File

@ -0,0 +1,48 @@
package io.druid.client.selector;
import com.google.api.client.util.Maps;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import java.util.Comparator;
import java.util.Map;
/**
*/
public class CustomTierSelectorStrategy extends AbstractTierSelectorStrategy
{
private final Comparator<Integer> comparator;
@Inject
public CustomTierSelectorStrategy(
ServerSelectorStrategy serverSelectorStrategy,
CustomTierSelectorStrategyConfig config
)
{
super(serverSelectorStrategy);
final Map<Integer, Integer> lookup = Maps.newHashMap();
int pos = 0;
for (Integer integer : config.getPriorities()) {
lookup.put(integer, pos);
pos++;
}
this.comparator = new Comparator<Integer>()
{
@Override
public int compare(Integer o1, Integer o2)
{
int pos1 = lookup.get(o1);
int pos2 = lookup.get(o2);
return Ints.compare(pos1, pos2);
}
};
}
@Override
public Comparator<Integer> getComparator()
{
return comparator;
}
}

View File

@ -0,0 +1,38 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Lists;
import java.util.List;
/**
*/
public class CustomTierSelectorStrategyConfig
{
@JsonProperty
private List<Integer> priorities = Lists.newArrayList();
public List<Integer> getPriorities()
{
return priorities;
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import java.util.Comparator;
/**
*/
public class HighestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
{
private static final Comparator<Integer> comparator = new Comparator<Integer>()
{
@Override
public int compare(Integer o1, Integer o2)
{
return -Ints.compare(o1, o2);
}
};
@Inject
public HighestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
{
super(serverSelectorStrategy);
}
@Override
public Comparator<Integer> getComparator()
{
return comparator;
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import java.util.Comparator;
/**
*/
public class LowestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
{
private static final Comparator<Integer> comparator = new Comparator<Integer>()
{
@Override
public int compare(Integer o1, Integer o2)
{
return Ints.compare(o1, o2);
}
};
@Inject
public LowestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
{
super(serverSelectorStrategy);
}
@Override
public Comparator<Integer> getComparator()
{
return comparator;
}
}

View File

@ -20,36 +20,18 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{ {
private static final Random random = new Random(); private static final Random random = new Random();
@Override @Override
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment) public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{ {
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry(); return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Iterators.get(servers.iterator(), random.nextInt(size));
}
} }
} }

View File

@ -37,11 +37,11 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
private final Set<QueryableDruidServer> servers = Sets.newHashSet(); private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment; private final DataSegment segment;
private final ServerSelectorStrategy strategy; private final TierSelectorStrategy strategy;
public ServerSelector( public ServerSelector(
DataSegment segment, DataSegment segment,
ServerSelectorStrategy strategy TierSelectorStrategy strategy
) )
{ {
this.segment = segment; this.segment = segment;
@ -79,7 +79,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
public QueryableDruidServer pick() public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = Maps.newTreeMap(); final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) { for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority()); Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) { if (theServers == null) {

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@ -33,5 +32,5 @@ import java.util.TreeMap;
}) })
public interface ServerSelectorStrategy public interface ServerSelectorStrategy
{ {
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment); public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
} }

View File

@ -0,0 +1,43 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeMap;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HighestPriorityTierSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "highestPriority", value = HighestPriorityTierSelectorStrategy.class),
@JsonSubTypes.Type(name = "lowestPriority", value = LowestPriorityTierSelectorStrategy.class),
@JsonSubTypes.Type(name = "custom", value = CustomTierSelectorStrategy.class)
})
public interface TierSelectorStrategy
{
public Comparator<Integer> getComparator();
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
}

View File

@ -38,6 +38,7 @@ import com.metamx.common.guava.nary.TrinaryFn;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache; import io.druid.client.cache.MapCache;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.RandomServerSelectorStrategy; import io.druid.client.selector.RandomServerSelectorStrategy;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
@ -585,7 +586,8 @@ public class CachingClusteredClientTest
} }
@Test @Test
public void testTopNOnPostAggMetricCaching() { public void testTopNOnPostAggMetricCaching()
{
final TopNQueryBuilder builder = new TopNQueryBuilder() final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource(DATA_SOURCE) .dataSource(DATA_SOURCE)
.dimension(TOP_DIM) .dimension(TOP_DIM)
@ -914,7 +916,10 @@ public class CachingClusteredClientTest
); );
serverExpectations.get(lastServer).addExpectation(expectation); serverExpectations.get(lastServer).addExpectation(expectation);
ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy()); ServerSelector selector = new ServerSelector(
expectation.getSegment(),
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServer(new QueryableDruidServer(lastServer, null)); selector.addServer(new QueryableDruidServer(lastServer, null));
final PartitionChunk<ServerSelector> chunk; final PartitionChunk<ServerSelector> chunk;

View File

@ -28,6 +28,7 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request; import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder; import com.metamx.http.client.RequestBuilder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
@ -87,7 +88,7 @@ public class DirectDruidClientTest
0, 0,
0L 0L
), ),
new ConnectionCountServerSelectorStrategy() new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
); );
DirectDruidClient client1 = new DirectDruidClient( DirectDruidClient client1 = new DirectDruidClient(

View File

@ -31,7 +31,9 @@ import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheMonitor; import io.druid.client.cache.CacheMonitor;
import io.druid.client.cache.CacheProvider; import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
@ -83,7 +85,9 @@ public class CliBroker extends ServerRunnable
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class); JsonConfigProvider.bind(binder, "druid.broker.select.tier", TierSelectorStrategy.class);
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);
JsonConfigProvider.bind(binder, "druid.broker.select.server", ServerSelectorStrategy.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);