Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2014-01-10 16:19:20 -08:00
commit 917b4ea5ce
13 changed files with 206 additions and 38 deletions

View File

@ -35,9 +35,10 @@ JVM Configuration
The broker module uses several of the default modules in [Configuration](Configuration.html) and has the following set of configurations as well:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.cache.type`|Choices: local, memcache. The type of cache to use for queries.|local|
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcache`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache
@ -76,4 +77,4 @@ Caching
-------
Broker nodes employ a cache with a LRU cache invalidation strategy. The broker cache stores per segment results. The cache can be local to each broker node or shared across multiple nodes using an external distributed cache such as [memcached](http://memcached.org/). Each time a broker node receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker node will forward the query to the
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.

View File

@ -86,7 +86,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
if (chatHandlerProvider.isPresent()) {
log.info("Found chathandler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(serviceName, firehose);
chatHandlerProvider.get().register(serviceName.replaceAll(".*:", ""), firehose); // rofl
if (serviceName.contains(":")) {
chatHandlerProvider.get().register(serviceName.replaceAll(".*:", ""), firehose); // rofl
}
} else {
log.info("No chathandler detected");
}

View File

@ -78,7 +78,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.8.4</version>
<version>0.8.5</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -25,8 +25,9 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.QueryRunner;
@ -57,19 +58,22 @@ public class BrokerServerView implements TimelineServerView
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerView baseView;
private final ServerSelectorStrategy serverSelectorStrategy;
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerView baseView
ServerView baseView,
ServerSelectorStrategy serverSelectorStrategy
)
{
this.warehose = warehose;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
this.serverSelectorStrategy = serverSelectorStrategy;
this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
@ -164,7 +168,7 @@ public class BrokerServerView implements TimelineServerView
ServerSelector selector = selectors.get(segmentId);
if (selector == null) {
selector = new ServerSelector(segment);
selector = new ServerSelector(segment, serverSelectorStrategy);
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {

View File

@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.cache.Cache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;

View File

@ -29,6 +29,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
@ -123,12 +126,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
typeRef = types.lhs;
}
final Future<InputStream> future;
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
@ -169,11 +171,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime))
);
openConnections.getAndDecrement();
return super.done(clientResponse);
}
}
);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Collections.min(servers, comparator);
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.collect.Iterators;
import java.util.Random;
import java.util.Set;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
}
}

View File

@ -31,24 +31,18 @@ import java.util.Set;
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment;
private final ServerSelectorStrategy strategy;
public ServerSelector(
DataSegment segment
DataSegment segment,
ServerSelectorStrategy strategy
)
{
this.segment = segment;
this.strategy = strategy;
}
public DataSegment getSegment()
@ -86,7 +80,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Collections.min(servers, comparator);
default: return strategy.pick(servers);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class),
@JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class)
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers);
}

View File

@ -128,6 +128,7 @@ public class QueryResource
.setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);

View File

@ -17,35 +17,41 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
package io.druid.client;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import io.druid.client.DirectDruidClient;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
/**
*/
public class ServerSelectorTest
public class DirectDruidClientTest
{
private HttpClient httpClient;
@ -56,10 +62,16 @@ public class ServerSelectorTest
}
@Test
public void testPick() throws Exception
public void testRun() throws Exception
{
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient);
@ -74,18 +86,19 @@ public class ServerSelectorTest
new NoneShardSpec(),
0,
0L
)
),
new ConnectionCountServerSelectorStrategy()
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo2"
);
@ -103,11 +116,28 @@ public class ServerSelectorTest
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query);
client1.run(query);
client1.run(query);
Sequence s1 = client1.run(query);
Assert.assertEquals(1, client1.getNumOpenConnections());
Assert.assertTrue(client1.getNumOpenConnections() == 3);
// simulate read timeout
Sequence s2 = client1.run(query);
Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections());
// subsequent connections should work
Sequence s3 = client1.run(query);
Sequence s4 = client1.run(query);
Sequence s5 = client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 4);
// produce result for first connection
futureResult.set(new ByteArrayInputStream("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]".getBytes()));
List<Result> results = Sequences.toList(s1, Lists.<Result>newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections());
client2.run(query);
client2.run(query);

View File

@ -30,6 +30,7 @@ import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheMonitor;
import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
@ -81,6 +82,8 @@ public class CliBroker extends ServerRunnable
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);