diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 44c35d78b13..72519cbeb8c 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -49,6 +49,7 @@ import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.MetricManipulationFn; +import com.metamx.druid.query.Queries; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChestWarehouse; @@ -125,12 +126,18 @@ public class CachingClusteredClient implements QueryRunner && strategy != null; final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); - final Query rewrittenQuery; + + ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder(); + + final String priority = query.getContextValue("priority", "0"); + contextBuilder.put("priority", priority); + if (populateCache) { - rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true")); - } else { - rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("intermediate", "true")); + contextBuilder.put("bySegment", "true"); } + contextBuilder.put("intermediate", "true"); + + final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java index 52fc99a35b1..0dd878c7aad 100644 --- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java @@ -34,7 +34,6 @@ import com.metamx.druid.Query; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -83,6 +82,8 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { + final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + return new BaseSequence>( new BaseSequence.IteratorMaker>() { @@ -99,7 +100,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Future> apply(final QueryRunner input) { return exec.submit( - new Callable>() + new PrioritizedCallable>(priority) { @Override public List call() throws Exception diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java new file mode 100644 index 00000000000..3771b8a03c2 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java @@ -0,0 +1,39 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import java.util.concurrent.Callable; + +/** + */ +public abstract class PrioritizedCallable implements Callable +{ + final int priority; + + public PrioritizedCallable(int priority) + { + this.priority = priority; + } + + public int getPriority() + { + return priority; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java new file mode 100644 index 00000000000..5e13184de8b --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java @@ -0,0 +1,158 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.lifecycle.Lifecycle; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + */ +public class PrioritizedExecutorService extends AbstractExecutorService +{ + public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) + { + final ExecutorService service = new PrioritizedExecutorService( + new ThreadPoolExecutor( + config.getNumThreads(), + config.getNumThreads(), + 0L, + TimeUnit.MILLISECONDS, + new PriorityBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build() + ) + ); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + } + + @Override + public void stop() + { + service.shutdownNow(); + } + } + ); + + return service; + } + private static final int DEFAULT_PRIORITY = 0; + + + private final ThreadPoolExecutor threadPoolExecutor; + + public PrioritizedExecutorService( + ThreadPoolExecutor threadPoolExecutor + ) + { + this.threadPoolExecutor = threadPoolExecutor; + } + + @Override + public void shutdown() + { + threadPoolExecutor.shutdown(); + } + + @Override + public List shutdownNow() + { + return threadPoolExecutor.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return threadPoolExecutor.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return threadPoolExecutor.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException + { + return threadPoolExecutor.awaitTermination(l, timeUnit); + } + + @Override + public void execute(Runnable runnable) + { + threadPoolExecutor.execute(runnable); + } + + @Override + protected RunnableFuture newTaskFor(final Callable tCallable) + { + Callable theCallable = tCallable; + if (!(tCallable instanceof PrioritizedCallable)) { + theCallable = new PrioritizedCallable(DEFAULT_PRIORITY) + { + @Override + public T call() throws Exception + { + return tCallable.call(); + } + }; + } + return new PrioritizedFuture((PrioritizedCallable) theCallable); + } + + private static class PrioritizedFuture extends FutureTask implements Comparable + { + private final PrioritizedCallable callable; + + public PrioritizedFuture(PrioritizedCallable callable) + { + super(callable); + this.callable = callable; + } + + public int getPriority() + { + return callable.getPriority(); + } + + @Override + public int compareTo(PrioritizedFuture future) + { + return -Ints.compare(getPriority(), future.getPriority()); + } + } +} diff --git a/client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java b/client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java new file mode 100644 index 00000000000..b6ce2a9ee1c --- /dev/null +++ b/client/src/test/java/com/metamx/druid/query/PrioritizedExecutorServiceTest.java @@ -0,0 +1,135 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import com.google.common.collect.ImmutableList; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.lifecycle.Lifecycle; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +/** + */ +public class PrioritizedExecutorServiceTest +{ + private ExecutorService exec; + private CountDownLatch latch; + private CountDownLatch finishLatch; + + @Before + public void setUp() throws Exception + { + exec = PrioritizedExecutorService.create( + new Lifecycle(), + new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 1; + } + } + ); + + latch = new CountDownLatch(1); + finishLatch = new CountDownLatch(3); + } + + /** + * Submits a normal priority task to block the queue, followed by low, high, normal priority tasks. + * Tests to see that the high priority task is executed first, followed by the normal and low priority tasks. + * + * @throws Exception + */ + @Test + public void testSubmit() throws Exception + { + final ConcurrentLinkedQueue order = new ConcurrentLinkedQueue(); + + exec.submit( + new PrioritizedCallable(0) + { + @Override + public Void call() throws Exception + { + latch.await(); + return null; + } + } + ); + + exec.submit( + new PrioritizedCallable(-1) + { + @Override + public Void call() throws Exception + { + order.add(-1); + finishLatch.countDown(); + return null; + } + } + ); + exec.submit( + new PrioritizedCallable(0) + { + @Override + public Void call() throws Exception + { + order.add(0); + finishLatch.countDown(); + return null; + } + } + ); + exec.submit( + new PrioritizedCallable(2) + { + @Override + public Void call() throws Exception + { + order.add(2); + finishLatch.countDown(); + return null; + } + } + ); + + latch.countDown(); + finishLatch.await(); + + Assert.assertTrue(order.size() == 3); + + List expected = ImmutableList.of(2, 0, -1); + Assert.assertEquals(expected, ImmutableList.copyOf(order)); + } +} diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index f538b69b4db..44abd781931 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -26,7 +26,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.metamx.common.ISE; import com.metamx.common.concurrent.ExecutorServiceConfig; -import com.metamx.common.concurrent.ExecutorServices; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; @@ -38,17 +37,14 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.SegmentLoader; -import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.query.MetricsEmittingExecutorService; +import com.metamx.druid.query.PrioritizedExecutorService; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.Monitor; import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; @@ -101,13 +97,15 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); + final ExecutorService innerExecutorService = PrioritizedExecutorService.create( + getLifecycle(), + getConfigFactory().buildWithReplacements( + ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") + ) + ); + final ExecutorService executorService = new MetricsEmittingExecutorService( - ExecutorServices.create( - getLifecycle(), - getConfigFactory().buildWithReplacements( - ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") - ) - ), + innerExecutorService, emitter, new ServiceMetricEvent.Builder() );