diff --git a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java index 54a67257692..ccb28464c6a 100644 --- a/client/src/main/java/com/metamx/druid/client/BrokerServerView.java +++ b/client/src/main/java/com/metamx/druid/client/BrokerServerView.java @@ -24,6 +24,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.metamx.common.logger.Logger; import com.metamx.druid.VersionedIntervalTimeline; +import com.metamx.druid.client.selector.QueryableDruidServer; import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.query.QueryRunner; @@ -44,7 +45,7 @@ public class BrokerServerView implements TimelineServerView private final Object lock = new Object(); - private final ConcurrentMap clients; + private final ConcurrentMap clients; private final Map selectors; private final Map> timelines; @@ -107,7 +108,7 @@ public class BrokerServerView implements TimelineServerView public void clear() { synchronized (lock) { - final Iterator clientsIter = clients.keySet().iterator(); + final Iterator clientsIter = clients.keySet().iterator(); while (clientsIter.hasNext()) { clientsIter.remove(); } @@ -119,7 +120,7 @@ public class BrokerServerView implements TimelineServerView final ServerSelector selector = selectorsIter.next(); selectorsIter.remove(); while (!selector.isEmpty()) { - final DruidServer pick = selector.pick(); + final QueryableDruidServer pick = selector.pick(); selector.removeServer(pick); } } @@ -128,7 +129,10 @@ public class BrokerServerView implements TimelineServerView private void addServer(DruidServer server) { - QueryRunner exists = clients.put(server, makeDirectClient(server)); + QueryableDruidServer exists = clients.put( + server.getName(), + new QueryableDruidServer(server, makeDirectClient(server)) + ); if (exists != null) { log.warn("QueryRunner for server[%s] already existed!?", server); } @@ -141,7 +145,7 @@ public class BrokerServerView implements TimelineServerView private void removeServer(DruidServer server) { - clients.remove(server); + clients.remove(server.getName()); for (DataSegment segment : server.getSegments().values()) { serverRemovedSegment(server, segment); } @@ -167,10 +171,10 @@ public class BrokerServerView implements TimelineServerView selectors.put(segmentId, selector); } - if (!clients.containsKey(server)) { + if (!clients.containsKey(server.getName())) { addServer(server); } - selector.addServer(server); + selector.addServer(clients.get(server.getName())); } } @@ -188,7 +192,8 @@ public class BrokerServerView implements TimelineServerView return; } - if (!selector.removeServer(server)) { + QueryableDruidServer queryableDruidServer = clients.get(server.getName()); + if (!selector.removeServer(queryableDruidServer)) { log.warn( "Asked to disassociate non-existant association between server[%s] and segment[%s]", server, @@ -228,7 +233,11 @@ public class BrokerServerView implements TimelineServerView public QueryRunner getQueryRunner(DruidServer server) { synchronized (lock) { - return clients.get(server); + QueryableDruidServer queryableDruidServer = clients.get(server.getName()); + if (queryableDruidServer == null) { + log.error("WTF?! No QueryableDruidServer found for %s", server.getName()); + } + return queryableDruidServer.getClient(); } } diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 261ed51e136..eba9b399ccd 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -203,7 +203,7 @@ public class CachingClusteredClient implements QueryRunner // Compile list of all segments not pulled from cache for(Pair segment : segments) { - final DruidServer server = segment.lhs.pick(); + final DruidServer server = segment.lhs.pick().getServer(); List descriptors = serverSegments.get(server); if (descriptors == null) { diff --git a/client/src/main/java/com/metamx/druid/client/DirectDruidClient.java b/client/src/main/java/com/metamx/druid/client/DirectDruidClient.java index 755c79d7cc3..92a4e5b9426 100644 --- a/client/src/main/java/com/metamx/druid/client/DirectDruidClient.java +++ b/client/src/main/java/com/metamx/druid/client/DirectDruidClient.java @@ -47,7 +47,6 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; - import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -60,6 +59,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -74,6 +74,7 @@ public class DirectDruidClient implements QueryRunner private final HttpClient httpClient; private final String host; + private final AtomicInteger openConnections; private final boolean isSmile; public DirectDruidClient( @@ -88,7 +89,13 @@ public class DirectDruidClient implements QueryRunner this.httpClient = httpClient; this.host = host; - isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory; + this.isSmile = this.objectMapper.getJsonFactory() instanceof SmileFactory; + this.openConnections = new AtomicInteger(); + } + + public int getNumOpenConnections() + { + return openConnections.get(); } @Override @@ -121,6 +128,7 @@ public class DirectDruidClient implements QueryRunner try { log.debug("Querying url[%s]", url); + openConnections.getAndIncrement(); future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) @@ -128,7 +136,6 @@ public class DirectDruidClient implements QueryRunner .go( new InputStreamResponseHandler() { - long startTime; long byteCount = 0; @@ -162,6 +169,7 @@ public class DirectDruidClient implements QueryRunner stopTime - startTime, byteCount / (0.0001 * (stopTime - startTime)) ); + openConnections.getAndDecrement(); return super.done(clientResponse); } } diff --git a/client/src/main/java/com/metamx/druid/client/selector/QueryableDruidServer.java b/client/src/main/java/com/metamx/druid/client/selector/QueryableDruidServer.java new file mode 100644 index 00000000000..2528facb8d8 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/selector/QueryableDruidServer.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.selector; + +import com.metamx.druid.client.DirectDruidClient; +import com.metamx.druid.client.DruidServer; + +/** + */ +public class QueryableDruidServer +{ + private final DruidServer server; + private final DirectDruidClient client; + + public QueryableDruidServer(DruidServer server, DirectDruidClient client) + { + this.server = server; + this.client = client; + } + + public DruidServer getServer() + { + return server; + } + + public DirectDruidClient getClient() + { + return client; + } +} diff --git a/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java b/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java index 22d56e46f69..086b117b59a 100644 --- a/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java +++ b/client/src/main/java/com/metamx/druid/client/selector/ServerSelector.java @@ -19,21 +19,30 @@ package com.metamx.druid.client.selector; -import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; -import java.util.LinkedHashSet; -import java.util.Random; +import java.util.Collections; +import java.util.Comparator; +import java.util.Set; /** */ public class ServerSelector { - private static final Random random = new Random(); + private static final Comparator comparator = new Comparator() + { + @Override + public int compare(QueryableDruidServer left, QueryableDruidServer right) + { + return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections()); + } + }; + + private final Set servers = Sets.newHashSet(); - private final LinkedHashSet servers = Sets.newLinkedHashSet(); private final DataSegment segment; public ServerSelector( @@ -49,7 +58,7 @@ public class ServerSelector } public void addServer( - DruidServer server + QueryableDruidServer server ) { synchronized (this) { @@ -57,7 +66,7 @@ public class ServerSelector } } - public boolean removeServer(DruidServer server) + public boolean removeServer(QueryableDruidServer server) { synchronized (this) { return servers.remove(server); @@ -71,15 +80,10 @@ public class ServerSelector } } - public DruidServer pick() + public QueryableDruidServer pick() { synchronized (this) { - final int size = servers.size(); - switch (size) { - case 0: return null; - case 1: return servers.iterator().next(); - default: return Iterables.get(servers, random.nextInt(size)); - } + return Collections.min(servers, comparator); } } } diff --git a/client/src/test/java/com/metamx/druid/client/selector/ServerSelectorTest.java b/client/src/test/java/com/metamx/druid/client/selector/ServerSelectorTest.java new file mode 100644 index 00000000000..9009ec3d246 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/client/selector/ServerSelectorTest.java @@ -0,0 +1,121 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.selector; + +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.SettableFuture; +import com.metamx.druid.Druids; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DirectDruidClient; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.query.ReflectionQueryToolChestWarehouse; +import com.metamx.druid.query.timeboundary.TimeBoundaryQuery; +import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.RequestBuilder; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; + +/** + */ +public class ServerSelectorTest +{ + private HttpClient httpClient; + + @Before + public void setUp() throws Exception + { + httpClient = EasyMock.createMock(HttpClient.class); + } + + @Test + public void testPick() throws Exception + { + RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); + EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn(requestBuilder).atLeastOnce(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(SettableFuture.create()).atLeastOnce(); + EasyMock.replay(httpClient); + + final ServerSelector serverSelector = new ServerSelector( + new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + new DateTime("2013-01-01").toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 0L + ) + ); + + DirectDruidClient client1 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + new DefaultObjectMapper(new SmileFactory()), + httpClient, + "foo" + ); + DirectDruidClient client2 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + new DefaultObjectMapper(new SmileFactory()), + httpClient, + "foo2" + ); + + QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( + null, + client1 + ); + serverSelector.addServer(queryableDruidServer1); + QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( + null, + client2 + ); + serverSelector.addServer(queryableDruidServer2); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + + client1.run(query); + client1.run(query); + client1.run(query); + + Assert.assertTrue(client1.getNumOpenConnections() == 3); + + client2.run(query); + client2.run(query); + + Assert.assertTrue(client2.getNumOpenConnections() == 2); + + Assert.assertTrue(serverSelector.pick() == queryableDruidServer2); + + EasyMock.verify(httpClient); + } +} diff --git a/index-common/src/main/java/com/metamx/druid/index/column/ColumnDescriptor.java b/index-common/src/main/java/com/metamx/druid/index/column/ColumnDescriptor.java index 2f8281250bd..72918aee9fa 100644 --- a/index-common/src/main/java/com/metamx/druid/index/column/ColumnDescriptor.java +++ b/index-common/src/main/java/com/metamx/druid/index/column/ColumnDescriptor.java @@ -74,9 +74,9 @@ public class ColumnDescriptor return parts; } - public int numBytes() + public long numBytes() { - int retVal = 0; + long retVal = 0; for (ColumnPartSerde part : parts) { retVal += part.numBytes(); diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/ColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/ColumnPartSerde.java index 5cf1f54786b..556fab5ea4d 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/ColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/ColumnPartSerde.java @@ -38,7 +38,7 @@ import java.nio.channels.WritableByteChannel; }) public interface ColumnPartSerde { - public int numBytes(); + public long numBytes(); public void write(WritableByteChannel channel) throws IOException; public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder); } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java index b5ba4456e03..03e13cfe371 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/ComplexColumnPartSerde.java @@ -61,7 +61,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde } @Override - public int numBytes() + public long numBytes() { return column.getSerializedSize(); } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java index 5f7a38014d6..8f5b80af85c 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java @@ -49,7 +49,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde private final VSizeIndexed multiValuedColumn; private final GenericIndexed bitmaps; - private final int size; + private final long size; public DictionaryEncodedColumnPartSerde( GenericIndexed dictionary, @@ -63,7 +63,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde this.multiValuedColumn = multiValCol; this.bitmaps = bitmaps; - int size = dictionary.getSerializedSize(); + long size = dictionary.getSerializedSize(); if (singleValCol != null && multiValCol == null) { size += singleValCol.getSerializedSize(); } @@ -94,7 +94,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde } @Override - public int numBytes() + public long numBytes() { return 1 + size; } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/FloatGenericColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/FloatGenericColumnPartSerde.java index 85646b0299f..13f86775f30 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/FloatGenericColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/FloatGenericColumnPartSerde.java @@ -58,7 +58,7 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde } @Override - public int numBytes() + public long numBytes() { return compressedFloats.getSerializedSize(); } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/LongGenericColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/LongGenericColumnPartSerde.java index 122297cac81..e37f08dff81 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/LongGenericColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/LongGenericColumnPartSerde.java @@ -58,7 +58,7 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde } @Override - public int numBytes() + public long numBytes() { return compressedLongs.getSerializedSize(); } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsIndexedSupplier.java b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsIndexedSupplier.java index 1def2af031f..c93d09d4725 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsIndexedSupplier.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsIndexedSupplier.java @@ -150,7 +150,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier }; } - public int getSerializedSize() + public long getSerializedSize() { return baseFloatBuffers.getSerializedSize() + 1 + 4 + 4; } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsIndexedSupplier.java b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsIndexedSupplier.java index d66b68a0c3d..ee752e4a20f 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsIndexedSupplier.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsIndexedSupplier.java @@ -161,7 +161,7 @@ public class CompressedLongsIndexedSupplier implements Supplier }; } - public int getSerializedSize() + public long getSerializedSize() { return baseLongBuffers.getSerializedSize() + 1 + 4 + 4; } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index df3836d7945..ba141e2f819 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -630,7 +630,7 @@ public class IndexIO GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy); - final int numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16; + final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16; final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); dims9.writeToChannel(writer); diff --git a/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java index 5fed0c13b1f..99e003e844c 100644 --- a/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java +++ b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexed.java @@ -207,7 +207,7 @@ public class GenericIndexed implements Indexed return -(minIndex + 1); } - public int getSerializedSize() + public long getSerializedSize() { return theBuffer.remaining() + 2 + 4 + 4; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ThreadPoolTaskRunner.java similarity index 95% rename from merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java rename to merger/src/main/java/com/metamx/druid/merger/coordinator/ThreadPoolTaskRunner.java index 23fdf851c7d..2906c0196a8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ThreadPoolTaskRunner.java @@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.Query; import com.metamx.druid.merger.common.TaskStatus; @@ -51,20 +50,19 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; /** * Runs tasks in a JVM thread using an ExecutorService. */ -public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker +public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { private final TaskToolboxFactory toolboxFactory; private final ListeningExecutorService exec; private final Set runningItems = new ConcurrentSkipListSet(); - private static final EmittingLogger log = new EmittingLogger(ExecutorServiceTaskRunner.class); + private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); - public ExecutorServiceTaskRunner( + public ThreadPoolTaskRunner( TaskToolboxFactory toolboxFactory, ExecutorService exec ) diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java index 66ee23eb0df..93d7b770372 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java @@ -59,7 +59,7 @@ import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.merger.common.index.ChatHandlerProvider; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; -import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; +import com.metamx.druid.merger.coordinator.ThreadPoolTaskRunner; import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig; import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.utils.PropUtils; @@ -118,7 +118,7 @@ public class ExecutorNode extends BaseServerNode private ServiceAnnouncer serviceAnnouncer = null; private ServiceProvider coordinatorServiceProvider = null; private Server server = null; - private ExecutorServiceTaskRunner taskRunner = null; + private ThreadPoolTaskRunner taskRunner = null; private ExecutorLifecycle executorLifecycle = null; private ChatHandlerProvider chatHandlerProvider = null; @@ -247,7 +247,7 @@ public class ExecutorNode extends BaseServerNode executorLifecycle.join(); } - public ExecutorServiceTaskRunner getTaskRunner() + public ThreadPoolTaskRunner getTaskRunner() { return taskRunner; } @@ -414,7 +414,7 @@ public class ExecutorNode extends BaseServerNode { if (taskRunner == null) { this.taskRunner = lifecycle.addManagedInstance( - new ExecutorServiceTaskRunner( + new ThreadPoolTaskRunner( taskToolboxFactory, Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index f97a39d60d3..573e4e1cac8 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -291,7 +291,7 @@ public class RemoteTaskRunnerTest new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), cf, workerCuratorCoordinator, - new ExecutorServiceTaskRunner( + new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig() { diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 0134984f6cd..42a556d927c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -162,7 +162,7 @@ public class TaskLifecycleTest new DefaultObjectMapper() ); - tr = new ExecutorServiceTaskRunner( + tr = new ThreadPoolTaskRunner( tb, Executors.newSingleThreadExecutor() ); diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index 155d9e20793..dceb3b9fafa 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -20,6 +20,7 @@ package com.metamx.druid.coordination; import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.collect.Ordering; import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; @@ -257,6 +258,9 @@ public class ServerManager implements QuerySegmentWalker ); } } + ) + .filter( + Predicates.>notNull() ); return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index a464152ee90..662569e8485 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -51,13 +51,13 @@ public abstract class DruidMasterConfig @Config("druid.master.merger.on") public boolean isMergeSegments() { - return true; + return false; } @Config("druid.master.conversion.on") public boolean isConvertSegments() { - return true; + return false; } @Config("druid.master.merger.service")