From 09f7b181d4bac86c9c6192169aaa0d19a3b1c0ff Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 29 May 2013 17:04:25 -0700 Subject: [PATCH] first commit, working UT --- .../druid/client/CachingClusteredClient.java | 14 +- .../query/ChainedExecutionQueryRunner.java | 14 +- .../druid/query/PrioritizedCallable.java | 29 +++ .../druid/query/PriorityExecutorService.java | 145 +++++++++++++++ .../java/com/metamx/druid/query/Queries.java | 5 + .../query/PriorityExecutorServiceTest.java | 168 ++++++++++++++++++ pom.xml | 2 +- .../com/metamx/druid/http/ComputeNode.java | 20 +-- 8 files changed, 379 insertions(+), 18 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java create mode 100644 client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java create mode 100644 client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index bb1b18ecb7c..a30e660be22 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -49,6 +49,7 @@ import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.MetricManipulationFn; +import com.metamx.druid.query.Queries; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChestWarehouse; @@ -123,15 +124,18 @@ public class CachingClusteredClient implements QueryRunner final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null; final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); + final String priority = query.getContextValue("priority", Integer.toString(Queries.Priority.NORMAL.ordinal())); + + final Query prioritizedQuery = query.withOverriddenContext(ImmutableMap.of("priority", priority)); final Query rewrittenQuery; if (populateCache) { - rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true")); + rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true")); } else { - rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("intermediate", "true")); + rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("intermediate", "true")); } - VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); + VersionedIntervalTimeline timeline = serverView.getTimeline(prioritizedQuery.getDataSource()); if (timeline == null) { return Sequences.empty(); } @@ -147,7 +151,7 @@ public class CachingClusteredClient implements QueryRunner // Let tool chest filter out unneeded segments final List> filteredServersLookup = - toolChest.filterSegments(query, serversLookup); + toolChest.filterSegments(prioritizedQuery, serversLookup); for (TimelineObjectHolder holder : filteredServersLookup) { for (PartitionChunk chunk : holder.getObject()) { @@ -162,7 +166,7 @@ public class CachingClusteredClient implements QueryRunner final byte[] queryCacheKey; if (strategy != null) { - queryCacheKey = strategy.computeCacheKey(query); + queryCacheKey = strategy.computeCacheKey(prioritizedQuery); } else { queryCacheKey = null; } diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java index 52fc99a35b1..413d533d696 100644 --- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; @@ -99,8 +100,19 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Future> apply(final QueryRunner input) { return exec.submit( - new Callable>() + new PrioritizedCallable>() { + @Override + public int getPriority() + { + return Ints.tryParse( + query.getContextValue( + "priority", + Integer.toString(Queries.Priority.NORMAL.ordinal()) + ) + ); + } + @Override public List call() throws Exception { diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java new file mode 100644 index 00000000000..fea78256b59 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java @@ -0,0 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import java.util.concurrent.Callable; + +/** + */ +public interface PrioritizedCallable extends Callable +{ + public int getPriority(); +} diff --git a/client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java b/client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java new file mode 100644 index 00000000000..b2485e43e7c --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/PriorityExecutorService.java @@ -0,0 +1,145 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.lifecycle.Lifecycle; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + */ +public class PriorityExecutorService extends AbstractExecutorService +{ + public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) + { + final ExecutorService service = new PriorityExecutorService( + new ThreadPoolExecutor( + config.getNumThreads(), + config.getNumThreads(), + 0L, + TimeUnit.MILLISECONDS, + new PriorityBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build() + ) + ); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + } + + @Override + public void stop() + { + service.shutdownNow(); + } + } + ); + + return service; + } + + private final ThreadPoolExecutor threadPoolExecutor; + + public PriorityExecutorService( + ThreadPoolExecutor threadPoolExecutor + ) + { + this.threadPoolExecutor = threadPoolExecutor; + } + + @Override + public void shutdown() + { + threadPoolExecutor.shutdown(); + } + + @Override + public List shutdownNow() + { + return threadPoolExecutor.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return threadPoolExecutor.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return threadPoolExecutor.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException + { + return threadPoolExecutor.awaitTermination(l, timeUnit); + } + + @Override + public void execute(Runnable runnable) + { + threadPoolExecutor.execute(runnable); + } + + @Override + protected RunnableFuture newTaskFor(Callable tCallable) + { + return new PrioritizedFuture((PrioritizedCallable) tCallable); + } + + private static class PrioritizedFuture extends FutureTask implements Comparable + { + private final PrioritizedCallable callable; + + public PrioritizedFuture(PrioritizedCallable callable) + { + super(callable); + this.callable = callable; + } + + public int getPriority() + { + return callable.getPriority(); + } + + @Override + public int compareTo(PrioritizedFuture future) + { + return Ints.compare(getPriority(), future.getPriority()); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/query/Queries.java b/client/src/main/java/com/metamx/druid/query/Queries.java index 0ebc18e64fd..ad64621791e 100644 --- a/client/src/main/java/com/metamx/druid/query/Queries.java +++ b/client/src/main/java/com/metamx/druid/query/Queries.java @@ -34,6 +34,11 @@ import java.util.Set; */ public class Queries { + public static enum Priority + { + HIGH, NORMAL, LOW + } + public static void verifyAggregations( List aggFactories, List postAggs diff --git a/client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java b/client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java new file mode 100644 index 00000000000..cfb96f701f9 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/query/PriorityExecutorServiceTest.java @@ -0,0 +1,168 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import com.google.common.collect.Lists; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.lifecycle.Lifecycle; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +/** + */ +public class PriorityExecutorServiceTest +{ + private ExecutorService exec; + private CountDownLatch latch; + private CountDownLatch finishLatch; + + @Before + public void setUp() throws Exception + { + exec = PriorityExecutorService.create( + new Lifecycle(), + new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 1; + } + } + ); + + latch = new CountDownLatch(1); + finishLatch = new CountDownLatch(3); + } + + /** + * Submits a normal priority task to block the queue, followed by low, high, normal priority tasks. + * Tests to see that the high priority task is executed first, followed by the normal and low priority tasks. + * + * @throws Exception + */ + @Test + public void testSubmit() throws Exception + { + final ConcurrentLinkedQueue order = new ConcurrentLinkedQueue(); + + exec.submit( + new PrioritizedCallable() + { + @Override + public int getPriority() + { + return Queries.Priority.NORMAL.ordinal(); + } + + @Override + public Void call() throws Exception + { + latch.await(); + return null; + } + } + ); + + exec.submit( + new PrioritizedCallable() + { + @Override + public int getPriority() + { + return Queries.Priority.LOW.ordinal(); + } + + @Override + public Void call() throws Exception + { + order.add(Queries.Priority.LOW); + finishLatch.countDown(); + return null; + } + } + ); + exec.submit( + new PrioritizedCallable() + { + @Override + public int getPriority() + { + return Queries.Priority.HIGH.ordinal(); + } + + @Override + public Void call() throws Exception + { + order.add(Queries.Priority.HIGH); + finishLatch.countDown(); + return null; + } + } + ); + exec.submit( + new PrioritizedCallable() + { + @Override + public int getPriority() + { + return Queries.Priority.NORMAL.ordinal(); + } + + @Override + public Void call() throws Exception + { + order.add(Queries.Priority.NORMAL); + finishLatch.countDown(); + return null; + } + } + ); + + latch.countDown(); + finishLatch.await(); + + Assert.assertTrue(order.size() == 3); + + List expected = Lists.newArrayList( + Queries.Priority.HIGH, + Queries.Priority.NORMAL, + Queries.Priority.LOW + ); + + int i = 0; + for (Queries.Priority priority : order) { + Assert.assertEquals(expected.get(i), priority); + i++; + } + } +} diff --git a/pom.xml b/pom.xml index 46661acd03e..2b2b4cf7633 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ UTF-8 0.22.3 - 2.0.1-21-22 + 2.0.2-21-22 diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index f538b69b4db..e047fcae22f 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -26,7 +26,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.metamx.common.ISE; import com.metamx.common.concurrent.ExecutorServiceConfig; -import com.metamx.common.concurrent.ExecutorServices; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; @@ -38,17 +37,14 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.SegmentLoader; -import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.query.MetricsEmittingExecutorService; +import com.metamx.druid.query.PriorityExecutorService; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.Monitor; import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; @@ -101,13 +97,15 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); + final ExecutorService innerExecutorService = PriorityExecutorService.create( + getLifecycle(), + getConfigFactory().buildWithReplacements( + ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") + ) + ); + final ExecutorService executorService = new MetricsEmittingExecutorService( - ExecutorServices.create( - getLifecycle(), - getConfigFactory().buildWithReplacements( - ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") - ) - ), + innerExecutorService, emitter, new ServiceMetricEvent.Builder() );