mirror of https://github.com/apache/druid.git
first commit
This commit is contained in:
parent
8388d0593a
commit
6917ff02d8
|
@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.VersionedIntervalTimeline;
|
||||
import com.metamx.druid.client.selector.QueryableDruidServer;
|
||||
import com.metamx.druid.client.selector.ServerSelector;
|
||||
import com.metamx.druid.partition.PartitionChunk;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
|
@ -44,7 +45,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final ConcurrentMap<DruidServer, DirectDruidClient> clients;
|
||||
private final ConcurrentMap<String, QueryableDruidServer> clients;
|
||||
private final Map<String, ServerSelector> selectors;
|
||||
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
|
||||
|
||||
|
@ -107,7 +108,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
public void clear()
|
||||
{
|
||||
synchronized (lock) {
|
||||
final Iterator<DruidServer> clientsIter = clients.keySet().iterator();
|
||||
final Iterator<String> clientsIter = clients.keySet().iterator();
|
||||
while (clientsIter.hasNext()) {
|
||||
clientsIter.remove();
|
||||
}
|
||||
|
@ -119,7 +120,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
final ServerSelector selector = selectorsIter.next();
|
||||
selectorsIter.remove();
|
||||
while (!selector.isEmpty()) {
|
||||
final DruidServer pick = selector.pick();
|
||||
final QueryableDruidServer pick = selector.pick();
|
||||
selector.removeServer(pick);
|
||||
}
|
||||
}
|
||||
|
@ -128,7 +129,10 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
private void addServer(DruidServer server)
|
||||
{
|
||||
QueryRunner exists = clients.put(server, makeDirectClient(server));
|
||||
QueryableDruidServer exists = clients.put(
|
||||
server.getName(),
|
||||
new QueryableDruidServer(server, makeDirectClient(server))
|
||||
);
|
||||
if (exists != null) {
|
||||
log.warn("QueryRunner for server[%s] already existed!?", server);
|
||||
}
|
||||
|
@ -141,7 +145,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
private void removeServer(DruidServer server)
|
||||
{
|
||||
clients.remove(server);
|
||||
clients.remove(server.getName());
|
||||
for (DataSegment segment : server.getSegments().values()) {
|
||||
serverRemovedSegment(server, segment);
|
||||
}
|
||||
|
@ -167,10 +171,10 @@ public class BrokerServerView implements TimelineServerView
|
|||
selectors.put(segmentId, selector);
|
||||
}
|
||||
|
||||
if (!clients.containsKey(server)) {
|
||||
if (!clients.containsKey(server.getName())) {
|
||||
addServer(server);
|
||||
}
|
||||
selector.addServer(server);
|
||||
selector.addServer(clients.get(server.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,7 +192,8 @@ public class BrokerServerView implements TimelineServerView
|
|||
return;
|
||||
}
|
||||
|
||||
if (!selector.removeServer(server)) {
|
||||
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
|
||||
if (!selector.removeServer(queryableDruidServer)) {
|
||||
log.warn(
|
||||
"Asked to disassociate non-existant association between server[%s] and segment[%s]",
|
||||
server,
|
||||
|
@ -228,7 +233,11 @@ public class BrokerServerView implements TimelineServerView
|
|||
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
|
||||
{
|
||||
synchronized (lock) {
|
||||
return clients.get(server);
|
||||
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
|
||||
if (queryableDruidServer == null) {
|
||||
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
|
||||
}
|
||||
return queryableDruidServer.getClient();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -203,7 +203,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
// Compile list of all segments not pulled from cache
|
||||
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
|
||||
final DruidServer server = segment.lhs.pick();
|
||||
final DruidServer server = segment.lhs.pick().getServer();
|
||||
List<SegmentDescriptor> descriptors = serverSegments.get(server);
|
||||
|
||||
if (descriptors == null) {
|
||||
|
|
|
@ -60,6 +60,8 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -74,6 +76,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
private final HttpClient httpClient;
|
||||
private final String host;
|
||||
|
||||
private final AtomicInteger openConnections;
|
||||
private final boolean isSmile;
|
||||
|
||||
public DirectDruidClient(
|
||||
|
@ -88,7 +91,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
this.httpClient = httpClient;
|
||||
this.host = host;
|
||||
|
||||
isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory;
|
||||
this.isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory;
|
||||
this.openConnections = new AtomicInteger();
|
||||
}
|
||||
|
||||
public int getNumOpenConnections()
|
||||
{
|
||||
return openConnections.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -121,6 +130,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
|
||||
try {
|
||||
log.debug("Querying url[%s]", url);
|
||||
openConnections.getAndIncrement();
|
||||
future = httpClient
|
||||
.post(new URL(url))
|
||||
.setContent(objectMapper.writeValueAsBytes(query))
|
||||
|
@ -128,7 +138,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
.go(
|
||||
new InputStreamResponseHandler()
|
||||
{
|
||||
|
||||
long startTime;
|
||||
long byteCount = 0;
|
||||
|
||||
|
@ -162,6 +171,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
stopTime - startTime,
|
||||
byteCount / (0.0001 * (stopTime - startTime))
|
||||
);
|
||||
openConnections.getAndDecrement();
|
||||
return super.done(clientResponse);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package com.metamx.druid.client.selector;
|
||||
|
||||
import com.metamx.druid.client.DirectDruidClient;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class QueryableDruidServer
|
||||
{
|
||||
private final DruidServer server;
|
||||
private final DirectDruidClient client;
|
||||
|
||||
public QueryableDruidServer(DruidServer server, DirectDruidClient client)
|
||||
{
|
||||
this.server = server;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public DruidServer getServer()
|
||||
{
|
||||
return server;
|
||||
}
|
||||
|
||||
public DirectDruidClient getClient()
|
||||
{
|
||||
return client;
|
||||
}
|
||||
}
|
|
@ -19,21 +19,30 @@
|
|||
|
||||
package com.metamx.druid.client.selector;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerSelector
|
||||
{
|
||||
private static final Random random = new Random();
|
||||
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
|
||||
{
|
||||
@Override
|
||||
public int compare(QueryableDruidServer left, QueryableDruidServer right)
|
||||
{
|
||||
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
|
||||
}
|
||||
};
|
||||
|
||||
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
|
||||
|
||||
private final LinkedHashSet<DruidServer> servers = Sets.newLinkedHashSet();
|
||||
private final DataSegment segment;
|
||||
|
||||
public ServerSelector(
|
||||
|
@ -49,7 +58,7 @@ public class ServerSelector
|
|||
}
|
||||
|
||||
public void addServer(
|
||||
DruidServer server
|
||||
QueryableDruidServer server
|
||||
)
|
||||
{
|
||||
synchronized (this) {
|
||||
|
@ -57,7 +66,7 @@ public class ServerSelector
|
|||
}
|
||||
}
|
||||
|
||||
public boolean removeServer(DruidServer server)
|
||||
public boolean removeServer(QueryableDruidServer server)
|
||||
{
|
||||
synchronized (this) {
|
||||
return servers.remove(server);
|
||||
|
@ -71,15 +80,10 @@ public class ServerSelector
|
|||
}
|
||||
}
|
||||
|
||||
public DruidServer pick()
|
||||
public QueryableDruidServer pick()
|
||||
{
|
||||
synchronized (this) {
|
||||
final int size = servers.size();
|
||||
switch (size) {
|
||||
case 0: return null;
|
||||
case 1: return servers.iterator().next();
|
||||
default: return Iterables.get(servers, random.nextInt(size));
|
||||
}
|
||||
return Collections.min(servers, comparator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
package com.metamx.druid.client.selector;
|
||||
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.metamx.druid.Druids;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DirectDruidClient;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
|
||||
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.RequestBuilder;
|
||||
import junit.framework.Assert;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URL;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerSelectorTest
|
||||
{
|
||||
private HttpClient httpClient;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
httpClient = EasyMock.createMock(HttpClient.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPick() throws Exception
|
||||
{
|
||||
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
|
||||
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
|
||||
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
|
||||
EasyMock.replay(httpClient);
|
||||
|
||||
final ServerSelector serverSelector = new ServerSelector(
|
||||
new DataSegment(
|
||||
"test",
|
||||
new Interval("2013-01-01/2013-01-02"),
|
||||
new DateTime("2013-01-01").toString(),
|
||||
Maps.<String, Object>newHashMap(),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new NoneShardSpec(),
|
||||
0,
|
||||
0L
|
||||
)
|
||||
);
|
||||
|
||||
DirectDruidClient client1 = new DirectDruidClient(
|
||||
new ReflectionQueryToolChestWarehouse(),
|
||||
new DefaultObjectMapper(new SmileFactory()),
|
||||
httpClient,
|
||||
"foo"
|
||||
);
|
||||
DirectDruidClient client2 = new DirectDruidClient(
|
||||
new ReflectionQueryToolChestWarehouse(),
|
||||
new DefaultObjectMapper(new SmileFactory()),
|
||||
httpClient,
|
||||
"foo2"
|
||||
);
|
||||
|
||||
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
|
||||
null,
|
||||
client1
|
||||
);
|
||||
serverSelector.addServer(queryableDruidServer1);
|
||||
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
|
||||
null,
|
||||
client2
|
||||
);
|
||||
serverSelector.addServer(queryableDruidServer2);
|
||||
|
||||
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
|
||||
|
||||
client1.run(query);
|
||||
client1.run(query);
|
||||
client1.run(query);
|
||||
|
||||
Assert.assertTrue(client1.getNumOpenConnections() == 3);
|
||||
|
||||
client2.run(query);
|
||||
client2.run(query);
|
||||
|
||||
Assert.assertTrue(client2.getNumOpenConnections() == 2);
|
||||
|
||||
Assert.assertTrue(serverSelector.pick() == queryableDruidServer2);
|
||||
|
||||
EasyMock.verify(httpClient);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue