Merge pull request #353 from metamx/fix-request-balancing

Fix request balancing
This commit is contained in:
fjy 2014-01-09 16:06:06 -08:00
commit d33aba728a
11 changed files with 160 additions and 24 deletions

View File

@ -35,9 +35,10 @@ JVM Configuration
The broker module uses several of the default modules in [Configuration](Configuration.html) and has the following set of configurations as well:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.cache.type`|Choices: local, memcache. The type of cache to use for queries.|local|
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcache`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache
@ -76,4 +77,4 @@ Caching
-------
Broker nodes employ a cache with a LRU cache invalidation strategy. The broker cache stores per segment results. The cache can be local to each broker node or shared across multiple nodes using an external distributed cache such as [memcached](http://memcached.org/). Each time a broker node receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker node will forward the query to the
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.

View File

@ -78,7 +78,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.8.4</version>
<version>0.8.5</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -25,8 +25,9 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.QueryRunner;
@ -57,19 +58,22 @@ public class BrokerServerView implements TimelineServerView
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerView baseView;
private final ServerSelectorStrategy serverSelectorStrategy;
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerView baseView
ServerView baseView,
ServerSelectorStrategy serverSelectorStrategy
)
{
this.warehose = warehose;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
this.serverSelectorStrategy = serverSelectorStrategy;
this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
@ -164,7 +168,7 @@ public class BrokerServerView implements TimelineServerView
ServerSelector selector = selectors.get(segmentId);
if (selector == null) {
selector = new ServerSelector(segment);
selector = new ServerSelector(segment, serverSelectorStrategy);
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {

View File

@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.cache.Cache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;

View File

@ -29,6 +29,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
@ -123,12 +126,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
typeRef = types.lhs;
}
final Future<InputStream> future;
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
@ -169,11 +171,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime))
);
openConnections.getAndDecrement();
return super.done(clientResponse);
}
}
);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Collections.min(servers, comparator);
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.collect.Iterators;
import java.util.Random;
import java.util.Set;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
}
}

View File

@ -31,24 +31,18 @@ import java.util.Set;
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment;
private final ServerSelectorStrategy strategy;
public ServerSelector(
DataSegment segment
DataSegment segment,
ServerSelectorStrategy strategy
)
{
this.segment = segment;
this.strategy = strategy;
}
public DataSegment getSegment()
@ -86,7 +80,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Collections.min(servers, comparator);
default: return strategy.pick(servers);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class),
@JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class)
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers);
}

View File

@ -74,7 +74,8 @@ public class ServerSelectorTest
new NoneShardSpec(),
0,
0L
)
),
new ConnectionCountServerSelectorStrategy()
);
DirectDruidClient client1 = new DirectDruidClient(

View File

@ -30,6 +30,7 @@ import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheMonitor;
import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
@ -81,6 +82,8 @@ public class CliBroker extends ServerRunnable
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);