Merge pull request #142 from metamx/improve-bard

Enable the broker to select a compute node will the least number of open connections
This commit is contained in:
cheddar 2013-05-14 15:18:04 -07:00
commit 86415e70ea
22 changed files with 244 additions and 53 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.selector.QueryableDruidServer;
import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
@ -44,7 +45,7 @@ public class BrokerServerView implements TimelineServerView
private final Object lock = new Object(); private final Object lock = new Object();
private final ConcurrentMap<DruidServer, DirectDruidClient> clients; private final ConcurrentMap<String, QueryableDruidServer> clients;
private final Map<String, ServerSelector> selectors; private final Map<String, ServerSelector> selectors;
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines; private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
@ -107,7 +108,7 @@ public class BrokerServerView implements TimelineServerView
public void clear() public void clear()
{ {
synchronized (lock) { synchronized (lock) {
final Iterator<DruidServer> clientsIter = clients.keySet().iterator(); final Iterator<String> clientsIter = clients.keySet().iterator();
while (clientsIter.hasNext()) { while (clientsIter.hasNext()) {
clientsIter.remove(); clientsIter.remove();
} }
@ -119,7 +120,7 @@ public class BrokerServerView implements TimelineServerView
final ServerSelector selector = selectorsIter.next(); final ServerSelector selector = selectorsIter.next();
selectorsIter.remove(); selectorsIter.remove();
while (!selector.isEmpty()) { while (!selector.isEmpty()) {
final DruidServer pick = selector.pick(); final QueryableDruidServer pick = selector.pick();
selector.removeServer(pick); selector.removeServer(pick);
} }
} }
@ -128,7 +129,10 @@ public class BrokerServerView implements TimelineServerView
private void addServer(DruidServer server) private void addServer(DruidServer server)
{ {
QueryRunner exists = clients.put(server, makeDirectClient(server)); QueryableDruidServer exists = clients.put(
server.getName(),
new QueryableDruidServer(server, makeDirectClient(server))
);
if (exists != null) { if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!?", server); log.warn("QueryRunner for server[%s] already existed!?", server);
} }
@ -141,7 +145,7 @@ public class BrokerServerView implements TimelineServerView
private void removeServer(DruidServer server) private void removeServer(DruidServer server)
{ {
clients.remove(server); clients.remove(server.getName());
for (DataSegment segment : server.getSegments().values()) { for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment); serverRemovedSegment(server, segment);
} }
@ -167,10 +171,10 @@ public class BrokerServerView implements TimelineServerView
selectors.put(segmentId, selector); selectors.put(segmentId, selector);
} }
if (!clients.containsKey(server)) { if (!clients.containsKey(server.getName())) {
addServer(server); addServer(server);
} }
selector.addServer(server); selector.addServer(clients.get(server.getName()));
} }
} }
@ -188,7 +192,8 @@ public class BrokerServerView implements TimelineServerView
return; return;
} }
if (!selector.removeServer(server)) { QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (!selector.removeServer(queryableDruidServer)) {
log.warn( log.warn(
"Asked to disassociate non-existant association between server[%s] and segment[%s]", "Asked to disassociate non-existant association between server[%s] and segment[%s]",
server, server,
@ -228,7 +233,11 @@ public class BrokerServerView implements TimelineServerView
public <T> QueryRunner<T> getQueryRunner(DruidServer server) public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{ {
synchronized (lock) { synchronized (lock) {
return clients.get(server); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) {
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
}
return queryableDruidServer.getClient();
} }
} }

View File

@ -203,7 +203,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Compile list of all segments not pulled from cache // Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) { for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final DruidServer server = segment.lhs.pick(); final DruidServer server = segment.lhs.pick().getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server); List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) { if (descriptors == null) {

View File

@ -47,7 +47,6 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
@ -60,6 +59,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -74,6 +74,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final HttpClient httpClient; private final HttpClient httpClient;
private final String host; private final String host;
private final AtomicInteger openConnections;
private final boolean isSmile; private final boolean isSmile;
public DirectDruidClient( public DirectDruidClient(
@ -88,7 +89,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
this.httpClient = httpClient; this.httpClient = httpClient;
this.host = host; this.host = host;
isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory; this.isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
}
public int getNumOpenConnections()
{
return openConnections.get();
} }
@Override @Override
@ -121,6 +128,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try { try {
log.debug("Querying url[%s]", url); log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient future = httpClient
.post(new URL(url)) .post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query)) .setContent(objectMapper.writeValueAsBytes(query))
@ -128,7 +136,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
.go( .go(
new InputStreamResponseHandler() new InputStreamResponseHandler()
{ {
long startTime; long startTime;
long byteCount = 0; long byteCount = 0;
@ -162,6 +169,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime, stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime)) byteCount / (0.0001 * (stopTime - startTime))
); );
openConnections.getAndDecrement();
return super.done(clientResponse); return super.done(clientResponse);
} }
} }

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.selector;
import com.metamx.druid.client.DirectDruidClient;
import com.metamx.druid.client.DruidServer;
/**
*/
public class QueryableDruidServer
{
private final DruidServer server;
private final DirectDruidClient client;
public QueryableDruidServer(DruidServer server, DirectDruidClient client)
{
this.server = server;
this.client = client;
}
public DruidServer getServer()
{
return server;
}
public DirectDruidClient getClient()
{
return client;
}
}

View File

@ -19,21 +19,30 @@
package com.metamx.druid.client.selector; package com.metamx.druid.client.selector;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import java.util.LinkedHashSet; import java.util.Collections;
import java.util.Random; import java.util.Comparator;
import java.util.Set;
/** /**
*/ */
public class ServerSelector public class ServerSelector
{ {
private static final Random random = new Random(); private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final LinkedHashSet<DruidServer> servers = Sets.newLinkedHashSet();
private final DataSegment segment; private final DataSegment segment;
public ServerSelector( public ServerSelector(
@ -49,7 +58,7 @@ public class ServerSelector
} }
public void addServer( public void addServer(
DruidServer server QueryableDruidServer server
) )
{ {
synchronized (this) { synchronized (this) {
@ -57,7 +66,7 @@ public class ServerSelector
} }
} }
public boolean removeServer(DruidServer server) public boolean removeServer(QueryableDruidServer server)
{ {
synchronized (this) { synchronized (this) {
return servers.remove(server); return servers.remove(server);
@ -71,15 +80,10 @@ public class ServerSelector
} }
} }
public DruidServer pick() public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
final int size = servers.size(); return Collections.min(servers, comparator);
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Iterables.get(servers, random.nextInt(size));
}
} }
} }
} }

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client.selector;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.Druids;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DirectDruidClient;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.net.URL;
/**
*/
public class ServerSelectorTest
{
private HttpClient httpClient;
@Before
public void setUp() throws Exception
{
httpClient = EasyMock.createMock(HttpClient.class);
}
@Test
public void testPick() throws Exception
{
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
)
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
httpClient,
"foo2"
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
null,
client1
);
serverSelector.addServer(queryableDruidServer1);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
null,
client2
);
serverSelector.addServer(queryableDruidServer2);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query);
client1.run(query);
client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 3);
client2.run(query);
client2.run(query);
Assert.assertTrue(client2.getNumOpenConnections() == 2);
Assert.assertTrue(serverSelector.pick() == queryableDruidServer2);
EasyMock.verify(httpClient);
}
}

View File

@ -74,9 +74,9 @@ public class ColumnDescriptor
return parts; return parts;
} }
public int numBytes() public long numBytes()
{ {
int retVal = 0; long retVal = 0;
for (ColumnPartSerde part : parts) { for (ColumnPartSerde part : parts) {
retVal += part.numBytes(); retVal += part.numBytes();

View File

@ -38,7 +38,7 @@ import java.nio.channels.WritableByteChannel;
}) })
public interface ColumnPartSerde public interface ColumnPartSerde
{ {
public int numBytes(); public long numBytes();
public void write(WritableByteChannel channel) throws IOException; public void write(WritableByteChannel channel) throws IOException;
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder); public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder);
} }

View File

@ -61,7 +61,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return column.getSerializedSize(); return column.getSerializedSize();
} }

View File

@ -49,7 +49,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
private final VSizeIndexed multiValuedColumn; private final VSizeIndexed multiValuedColumn;
private final GenericIndexed<ImmutableConciseSet> bitmaps; private final GenericIndexed<ImmutableConciseSet> bitmaps;
private final int size; private final long size;
public DictionaryEncodedColumnPartSerde( public DictionaryEncodedColumnPartSerde(
GenericIndexed<String> dictionary, GenericIndexed<String> dictionary,
@ -63,7 +63,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
this.multiValuedColumn = multiValCol; this.multiValuedColumn = multiValCol;
this.bitmaps = bitmaps; this.bitmaps = bitmaps;
int size = dictionary.getSerializedSize(); long size = dictionary.getSerializedSize();
if (singleValCol != null && multiValCol == null) { if (singleValCol != null && multiValCol == null) {
size += singleValCol.getSerializedSize(); size += singleValCol.getSerializedSize();
} }
@ -94,7 +94,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return 1 + size; return 1 + size;
} }

View File

@ -58,7 +58,7 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return compressedFloats.getSerializedSize(); return compressedFloats.getSerializedSize();
} }

View File

@ -58,7 +58,7 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde
} }
@Override @Override
public int numBytes() public long numBytes()
{ {
return compressedLongs.getSerializedSize(); return compressedLongs.getSerializedSize();
} }

View File

@ -150,7 +150,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
}; };
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4; return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4;
} }

View File

@ -161,7 +161,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
}; };
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return baseLongBuffers.getSerializedSize() + 1 + 4 + 4; return baseLongBuffers.getSerializedSize() + 1 + 4 + 4;
} }

View File

@ -630,7 +630,7 @@ public class IndexIO
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy); GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
final int numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16; final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer); cols.writeToChannel(writer);
dims9.writeToChannel(writer); dims9.writeToChannel(writer);

View File

@ -207,7 +207,7 @@ public class GenericIndexed<T> implements Indexed<T>
return -(minIndex + 1); return -(minIndex + 1);
} }
public int getSerializedSize() public long getSerializedSize()
{ {
return theBuffer.remaining() + 2 + 4 + 4; return theBuffer.remaining() + 2 + 4 + 4;
} }

View File

@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
@ -51,20 +50,19 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Runs tasks in a JVM thread using an ExecutorService. * Runs tasks in a JVM thread using an ExecutorService.
*/ */
public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
private final TaskToolboxFactory toolboxFactory; private final TaskToolboxFactory toolboxFactory;
private final ListeningExecutorService exec; private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>(); private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
private static final EmittingLogger log = new EmittingLogger(ExecutorServiceTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
public ExecutorServiceTaskRunner( public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory, TaskToolboxFactory toolboxFactory,
ExecutorService exec ExecutorService exec
) )

View File

@ -59,7 +59,7 @@ import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.ChatHandlerProvider; import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
@ -118,7 +118,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private ServiceAnnouncer serviceAnnouncer = null; private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null; private ServiceProvider coordinatorServiceProvider = null;
private Server server = null; private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null; private ThreadPoolTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null; private ExecutorLifecycle executorLifecycle = null;
private ChatHandlerProvider chatHandlerProvider = null; private ChatHandlerProvider chatHandlerProvider = null;
@ -247,7 +247,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
executorLifecycle.join(); executorLifecycle.join();
} }
public ExecutorServiceTaskRunner getTaskRunner() public ThreadPoolTaskRunner getTaskRunner()
{ {
return taskRunner; return taskRunner;
} }
@ -414,7 +414,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{ {
if (taskRunner == null) { if (taskRunner == null) {
this.taskRunner = lifecycle.addManagedInstance( this.taskRunner = lifecycle.addManagedInstance(
new ExecutorServiceTaskRunner( new ThreadPoolTaskRunner(
taskToolboxFactory, taskToolboxFactory,
Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()

View File

@ -291,7 +291,7 @@ public class RemoteTaskRunnerTest
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
cf, cf,
workerCuratorCoordinator, workerCuratorCoordinator,
new ExecutorServiceTaskRunner( new ThreadPoolTaskRunner(
new TaskToolboxFactory( new TaskToolboxFactory(
new TaskConfig() new TaskConfig()
{ {

View File

@ -162,7 +162,7 @@ public class TaskLifecycleTest
new DefaultObjectMapper() new DefaultObjectMapper()
); );
tr = new ExecutorServiceTaskRunner( tr = new ThreadPoolTaskRunner(
tb, tb,
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()
); );

View File

@ -20,6 +20,7 @@
package com.metamx.druid.coordination; package com.metamx.druid.coordination;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
@ -257,6 +258,9 @@ public class ServerManager implements QuerySegmentWalker
); );
} }
} }
)
.filter(
Predicates.<QueryRunner<T>>notNull()
); );
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);

View File

@ -51,13 +51,13 @@ public abstract class DruidMasterConfig
@Config("druid.master.merger.on") @Config("druid.master.merger.on")
public boolean isMergeSegments() public boolean isMergeSegments()
{ {
return true; return false;
} }
@Config("druid.master.conversion.on") @Config("druid.master.conversion.on")
public boolean isConvertSegments() public boolean isConvertSegments()
{ {
return true; return false;
} }
@Config("druid.master.merger.service") @Config("druid.master.merger.service")