mirror of https://github.com/apache/druid.git
make server selection configurable, add back random server selection
This commit is contained in:
parent
3d734944ef
commit
632d6b0a78
|
@ -25,8 +25,9 @@ import com.google.common.collect.Ordering;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import io.druid.client.selector.QueryableDruidServer;
|
|
||||||
import io.druid.client.selector.ServerSelector;
|
import io.druid.client.selector.ServerSelector;
|
||||||
|
import io.druid.client.selector.QueryableDruidServer;
|
||||||
|
import io.druid.client.selector.ServerSelectorStrategy;
|
||||||
import io.druid.concurrent.Execs;
|
import io.druid.concurrent.Execs;
|
||||||
import io.druid.guice.annotations.Client;
|
import io.druid.guice.annotations.Client;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
|
@ -57,19 +58,22 @@ public class BrokerServerView implements TimelineServerView
|
||||||
private final ObjectMapper smileMapper;
|
private final ObjectMapper smileMapper;
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
private final ServerView baseView;
|
private final ServerView baseView;
|
||||||
|
private final ServerSelectorStrategy serverSelectorStrategy;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public BrokerServerView(
|
public BrokerServerView(
|
||||||
QueryToolChestWarehouse warehose,
|
QueryToolChestWarehouse warehose,
|
||||||
ObjectMapper smileMapper,
|
ObjectMapper smileMapper,
|
||||||
@Client HttpClient httpClient,
|
@Client HttpClient httpClient,
|
||||||
ServerView baseView
|
ServerView baseView,
|
||||||
|
ServerSelectorStrategy serverSelectorStrategy
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehose = warehose;
|
this.warehose = warehose;
|
||||||
this.smileMapper = smileMapper;
|
this.smileMapper = smileMapper;
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.baseView = baseView;
|
this.baseView = baseView;
|
||||||
|
this.serverSelectorStrategy = serverSelectorStrategy;
|
||||||
|
|
||||||
this.clients = Maps.newConcurrentMap();
|
this.clients = Maps.newConcurrentMap();
|
||||||
this.selectors = Maps.newHashMap();
|
this.selectors = Maps.newHashMap();
|
||||||
|
@ -164,7 +168,7 @@ public class BrokerServerView implements TimelineServerView
|
||||||
|
|
||||||
ServerSelector selector = selectors.get(segmentId);
|
ServerSelector selector = selectors.get(segmentId);
|
||||||
if (selector == null) {
|
if (selector == null) {
|
||||||
selector = new ServerSelector(segment);
|
selector = new ServerSelector(segment, serverSelectorStrategy);
|
||||||
|
|
||||||
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
|
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
|
||||||
if (timeline == null) {
|
if (timeline == null) {
|
||||||
|
|
|
@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.client.cache.Cache;
|
import io.druid.client.cache.Cache;
|
||||||
import io.druid.client.selector.QueryableDruidServer;
|
|
||||||
import io.druid.client.selector.ServerSelector;
|
import io.druid.client.selector.ServerSelector;
|
||||||
|
import io.druid.client.selector.QueryableDruidServer;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
import io.druid.query.BySegmentResultValueClass;
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
import io.druid.query.CacheStrategy;
|
import io.druid.query.CacheStrategy;
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.selector;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
|
||||||
|
{
|
||||||
|
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int compare(QueryableDruidServer left, QueryableDruidServer right)
|
||||||
|
{
|
||||||
|
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
|
||||||
|
{
|
||||||
|
return Collections.min(servers, comparator);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.selector;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
|
||||||
|
{
|
||||||
|
Random random;
|
||||||
|
|
||||||
|
public RandomServerSelectorStrategy() {
|
||||||
|
this.random = new Random();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
|
||||||
|
{
|
||||||
|
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,24 +31,18 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
||||||
{
|
{
|
||||||
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public int compare(QueryableDruidServer left, QueryableDruidServer right)
|
|
||||||
{
|
|
||||||
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
|
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
|
||||||
|
|
||||||
private final DataSegment segment;
|
private final DataSegment segment;
|
||||||
|
private final ServerSelectorStrategy strategy;
|
||||||
|
|
||||||
public ServerSelector(
|
public ServerSelector(
|
||||||
DataSegment segment
|
DataSegment segment,
|
||||||
|
ServerSelectorStrategy strategy
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.segment = segment;
|
this.segment = segment;
|
||||||
|
this.strategy = strategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataSegment getSegment()
|
public DataSegment getSegment()
|
||||||
|
@ -86,7 +80,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
|
||||||
switch (size) {
|
switch (size) {
|
||||||
case 0: return null;
|
case 0: return null;
|
||||||
case 1: return servers.iterator().next();
|
case 1: return servers.iterator().next();
|
||||||
default: return Collections.min(servers, comparator);
|
default: return strategy.pick(servers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.selector;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
|
||||||
|
@JsonSubTypes(value = {
|
||||||
|
@JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class),
|
||||||
|
@JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class)
|
||||||
|
})
|
||||||
|
public interface ServerSelectorStrategy
|
||||||
|
{
|
||||||
|
public QueryableDruidServer pick(Set<QueryableDruidServer> servers);
|
||||||
|
}
|
|
@ -74,7 +74,8 @@ public class ServerSelectorTest
|
||||||
new NoneShardSpec(),
|
new NoneShardSpec(),
|
||||||
0,
|
0,
|
||||||
0L
|
0L
|
||||||
)
|
),
|
||||||
|
new ConnectionCountServerSelectorStrategy()
|
||||||
);
|
);
|
||||||
|
|
||||||
DirectDruidClient client1 = new DirectDruidClient(
|
DirectDruidClient client1 = new DirectDruidClient(
|
||||||
|
|
|
@ -30,6 +30,7 @@ import io.druid.client.TimelineServerView;
|
||||||
import io.druid.client.cache.Cache;
|
import io.druid.client.cache.Cache;
|
||||||
import io.druid.client.cache.CacheMonitor;
|
import io.druid.client.cache.CacheMonitor;
|
||||||
import io.druid.client.cache.CacheProvider;
|
import io.druid.client.cache.CacheProvider;
|
||||||
|
import io.druid.client.selector.ServerSelectorStrategy;
|
||||||
import io.druid.curator.discovery.DiscoveryModule;
|
import io.druid.curator.discovery.DiscoveryModule;
|
||||||
import io.druid.guice.Jerseys;
|
import io.druid.guice.Jerseys;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
@ -81,6 +82,8 @@ public class CliBroker extends ServerRunnable
|
||||||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
||||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
|
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
|
||||||
|
|
||||||
|
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||||
|
|
||||||
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
|
||||||
|
|
||||||
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
||||||
|
|
Loading…
Reference in New Issue