mirror of https://github.com/apache/druid.git
Smarter server selection at the router level
This commit is contained in:
parent
f88cb13ccb
commit
3c3daf452e
|
@ -27,7 +27,7 @@ import com.metamx.common.logger.Logger;
|
|||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.client.selector.ServerSelectorStrategy;
|
||||
import io.druid.client.selector.TierSelectorStrategy;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.query.DataSource;
|
||||
|
@ -61,7 +61,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
private final ObjectMapper smileMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final ServerView baseView;
|
||||
private final ServerSelectorStrategy serverSelectorStrategy;
|
||||
private final TierSelectorStrategy tierSelectorStrategy;
|
||||
|
||||
@Inject
|
||||
public BrokerServerView(
|
||||
|
@ -69,14 +69,14 @@ public class BrokerServerView implements TimelineServerView
|
|||
ObjectMapper smileMapper,
|
||||
@Client HttpClient httpClient,
|
||||
ServerView baseView,
|
||||
ServerSelectorStrategy serverSelectorStrategy
|
||||
TierSelectorStrategy tierSelectorStrategy
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.smileMapper = smileMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.baseView = baseView;
|
||||
this.serverSelectorStrategy = serverSelectorStrategy;
|
||||
this.tierSelectorStrategy = tierSelectorStrategy;
|
||||
|
||||
this.clients = Maps.newConcurrentMap();
|
||||
this.selectors = Maps.newHashMap();
|
||||
|
@ -171,7 +171,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
ServerSelector selector = selectors.get(segmentId);
|
||||
if (selector == null) {
|
||||
selector = new ServerSelector(segment, serverSelectorStrategy);
|
||||
selector = new ServerSelector(segment, tierSelectorStrategy);
|
||||
|
||||
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
|
||||
if (timeline == null) {
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class AbstractTierSelectorStrategy implements TierSelectorStrategy
|
||||
{
|
||||
private final ServerSelectorStrategy serverSelectorStrategy;
|
||||
|
||||
public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
|
||||
{
|
||||
this.serverSelectorStrategy = serverSelectorStrategy;
|
||||
}
|
||||
|
||||
public abstract Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers);
|
||||
|
||||
@Override
|
||||
public QueryableDruidServer pick(
|
||||
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
|
||||
)
|
||||
{
|
||||
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = getServers(prioritizedServers);
|
||||
|
||||
if (priorityServers == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Set<QueryableDruidServer> servers = priorityServers.getValue();
|
||||
switch (servers.size()) {
|
||||
case 0:
|
||||
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
|
||||
case 1:
|
||||
return priorityServers.getValue().iterator().next();
|
||||
default:
|
||||
return serverSelectorStrategy.pick(servers, segment);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,14 +20,11 @@
|
|||
package io.druid.client.selector;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
|
||||
{
|
||||
|
@ -41,25 +38,8 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
|
|||
};
|
||||
|
||||
@Override
|
||||
public QueryableDruidServer pick(
|
||||
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
|
||||
)
|
||||
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
|
||||
{
|
||||
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
|
||||
|
||||
if (highestPriorityServers == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
|
||||
final int size = servers.size();
|
||||
switch (size) {
|
||||
case 0:
|
||||
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
|
||||
case 1:
|
||||
return highestPriorityServers.getValue().iterator().next();
|
||||
default:
|
||||
return Collections.min(servers, comparator);
|
||||
}
|
||||
return Collections.min(servers, comparator);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HighestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
|
||||
{
|
||||
@Inject
|
||||
public HighestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
|
||||
{
|
||||
super(serverSelectorStrategy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers)
|
||||
{
|
||||
return prioritizedServers.pollLastEntry();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class LowestPriorityTierSelectorStrategy extends AbstractTierSelectorStrategy
|
||||
{
|
||||
@Inject
|
||||
public LowestPriorityTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
|
||||
{
|
||||
super(serverSelectorStrategy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map.Entry<Integer, Set<QueryableDruidServer>> getServers(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers)
|
||||
{
|
||||
return prioritizedServers.pollFirstEntry();
|
||||
}
|
||||
}
|
|
@ -20,36 +20,18 @@
|
|||
package io.druid.client.selector;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
|
||||
{
|
||||
private static final Random random = new Random();
|
||||
|
||||
@Override
|
||||
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment)
|
||||
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
|
||||
{
|
||||
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
|
||||
|
||||
if (highestPriorityServers == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
|
||||
final int size = servers.size();
|
||||
switch (size) {
|
||||
case 0:
|
||||
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
|
||||
case 1:
|
||||
return highestPriorityServers.getValue().iterator().next();
|
||||
default:
|
||||
return Iterators.get(servers.iterator(), random.nextInt(size));
|
||||
}
|
||||
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,11 +37,11 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
|||
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
|
||||
|
||||
private final DataSegment segment;
|
||||
private final ServerSelectorStrategy strategy;
|
||||
private final TierSelectorStrategy strategy;
|
||||
|
||||
public ServerSelector(
|
||||
DataSegment segment,
|
||||
ServerSelectorStrategy strategy
|
||||
TierSelectorStrategy strategy
|
||||
)
|
||||
{
|
||||
this.segment = segment;
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
|||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
|
||||
@JsonSubTypes(value = {
|
||||
|
@ -33,5 +32,5 @@ import java.util.TreeMap;
|
|||
})
|
||||
public interface ServerSelectorStrategy
|
||||
{
|
||||
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
|
||||
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.selector;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HighestPriorityTierSelectorStrategy.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "highestPriority", value = HighestPriorityTierSelectorStrategy.class),
|
||||
@JsonSubTypes.Type(name = "lowestPriority", value = LowestPriorityTierSelectorStrategy.class)
|
||||
})
|
||||
public interface TierSelectorStrategy
|
||||
{
|
||||
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
|
||||
}
|
|
@ -38,6 +38,7 @@ import com.metamx.common.guava.nary.TrinaryFn;
|
|||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.RandomServerSelectorStrategy;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
|
@ -152,7 +153,7 @@ public class CachingClusteredClientTest
|
|||
"*",
|
||||
Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
|
||||
new ConstantPostAggregator("constant", 2, 2 )
|
||||
new ConstantPostAggregator("constant", 2, 2)
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
|
@ -160,7 +161,7 @@ public class CachingClusteredClientTest
|
|||
"/",
|
||||
Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
|
||||
new ConstantPostAggregator("constant", 2, 2 )
|
||||
new ConstantPostAggregator("constant", 2, 2)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -585,7 +586,8 @@ public class CachingClusteredClientTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOnPostAggMetricCaching() {
|
||||
public void testTopNOnPostAggMetricCaching()
|
||||
{
|
||||
final TopNQueryBuilder builder = new TopNQueryBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.dimension(TOP_DIM)
|
||||
|
@ -914,7 +916,10 @@ public class CachingClusteredClientTest
|
|||
);
|
||||
serverExpectations.get(lastServer).addExpectation(expectation);
|
||||
|
||||
ServerSelector selector = new ServerSelector(expectation.getSegment(), new RandomServerSelectorStrategy());
|
||||
ServerSelector selector = new ServerSelector(
|
||||
expectation.getSegment(),
|
||||
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
|
||||
);
|
||||
selector.addServer(new QueryableDruidServer(lastServer, null));
|
||||
|
||||
final PartitionChunk<ServerSelector> chunk;
|
||||
|
@ -1097,16 +1102,16 @@ public class CachingClusteredClientTest
|
|||
(DateTime) objects[i],
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("rows", objects[i + 1])
|
||||
.put("imps", objects[i + 2])
|
||||
.put("impers", objects[i + 2])
|
||||
.put("avg_imps_per_row",avg_impr)
|
||||
.put("avg_imps_per_row_half",avg_impr / 2)
|
||||
.put("avg_imps_per_row_double",avg_impr * 2)
|
||||
.build()
|
||||
)
|
||||
.put("rows", objects[i + 1])
|
||||
.put("imps", objects[i + 2])
|
||||
.put("impers", objects[i + 2])
|
||||
.put("avg_imps_per_row", avg_impr)
|
||||
.put("avg_imps_per_row_half", avg_impr / 2)
|
||||
.put("avg_imps_per_row_double", avg_impr * 2)
|
||||
.build()
|
||||
)
|
||||
);
|
||||
)
|
||||
);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
@ -1186,14 +1191,14 @@ public class CachingClusteredClientTest
|
|||
final double rows = ((Number) objects[index + 1]).doubleValue();
|
||||
values.add(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put(TOP_DIM, objects[index])
|
||||
.put("rows", rows)
|
||||
.put("imps", imps)
|
||||
.put("impers", imps)
|
||||
.put("avg_imps_per_row", imps / rows)
|
||||
.put("avg_imps_per_row_double", ((imps * 2) / rows))
|
||||
.put("avg_imps_per_row_half", (imps / (rows * 2)))
|
||||
.build()
|
||||
.put(TOP_DIM, objects[index])
|
||||
.put("rows", rows)
|
||||
.put("imps", imps)
|
||||
.put("impers", imps)
|
||||
.put("avg_imps_per_row", imps / rows)
|
||||
.put("avg_imps_per_row_double", ((imps * 2) / rows))
|
||||
.put("avg_imps_per_row_half", (imps / (rows * 2)))
|
||||
.build()
|
||||
);
|
||||
index += 3;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.metamx.http.client.HttpClient;
|
|||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.RequestBuilder;
|
||||
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
|
||||
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -87,7 +88,7 @@ public class DirectDruidClientTest
|
|||
0,
|
||||
0L
|
||||
),
|
||||
new ConnectionCountServerSelectorStrategy()
|
||||
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
|
||||
);
|
||||
|
||||
DirectDruidClient client1 = new DirectDruidClient(
|
||||
|
|
|
@ -83,7 +83,8 @@ public class CliBroker extends ServerRunnable
|
|||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.tier", ServerSelectorStrategy.class);
|
||||
JsonConfigProvider.bind(binder, "druid.broker.select.server", ServerSelectorStrategy.class);
|
||||
|
||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||
|
||||
|
|
Loading…
Reference in New Issue