first commit, working UT

This commit is contained in:
fjy 2013-05-29 17:04:25 -07:00
parent 73a392d94a
commit 09f7b181d4
8 changed files with 379 additions and 18 deletions

View File

@ -49,6 +49,7 @@ import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.Queries;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.QueryToolChestWarehouse;
@ -123,15 +124,18 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final String priority = query.getContextValue("priority", Integer.toString(Queries.Priority.NORMAL.ordinal()));
final Query<T> prioritizedQuery = query.withOverriddenContext(ImmutableMap.of("priority", priority));
final Query<T> rewrittenQuery;
if (populateCache) {
rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true"));
rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true"));
} else {
rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
rewrittenQuery = prioritizedQuery.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
}
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(prioritizedQuery.getDataSource());
if (timeline == null) {
return Sequences.empty();
}
@ -147,7 +151,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Let tool chest filter out unneeded segments
final List<TimelineObjectHolder<String, ServerSelector>> filteredServersLookup =
toolChest.filterSegments(query, serversLookup);
toolChest.filterSegments(prioritizedQuery, serversLookup);
for (TimelineObjectHolder<String, ServerSelector> holder : filteredServersLookup) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
@ -162,7 +166,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final byte[] queryCacheKey;
if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
queryCacheKey = strategy.computeCacheKey(prioritizedQuery);
} else {
queryCacheKey = null;
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
@ -99,8 +100,19 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Future<List<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new Callable<List<T>>()
new PrioritizedCallable<List<T>>()
{
@Override
public int getPriority()
{
return Ints.tryParse(
query.getContextValue(
"priority",
Integer.toString(Queries.Priority.NORMAL.ordinal())
)
);
}
@Override
public List<T> call() throws Exception
{

View File

@ -0,0 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import java.util.concurrent.Callable;
/**
*/
public interface PrioritizedCallable<T> extends Callable<T>
{
public int getPriority();
}

View File

@ -0,0 +1,145 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
public class PriorityExecutorService extends AbstractExecutorService
{
public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
{
final ExecutorService service = new PriorityExecutorService(
new ThreadPoolExecutor(
config.getNumThreads(),
config.getNumThreads(),
0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
)
);
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
}
@Override
public void stop()
{
service.shutdownNow();
}
}
);
return service;
}
private final ThreadPoolExecutor threadPoolExecutor;
public PriorityExecutorService(
ThreadPoolExecutor threadPoolExecutor
)
{
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public void shutdown()
{
threadPoolExecutor.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return threadPoolExecutor.shutdownNow();
}
@Override
public boolean isShutdown()
{
return threadPoolExecutor.isShutdown();
}
@Override
public boolean isTerminated()
{
return threadPoolExecutor.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return threadPoolExecutor.awaitTermination(l, timeUnit);
}
@Override
public void execute(Runnable runnable)
{
threadPoolExecutor.execute(runnable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> tCallable)
{
return new PrioritizedFuture<T>((PrioritizedCallable) tCallable);
}
private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture>
{
private final PrioritizedCallable<V> callable;
public PrioritizedFuture(PrioritizedCallable<V> callable)
{
super(callable);
this.callable = callable;
}
public int getPriority()
{
return callable.getPriority();
}
@Override
public int compareTo(PrioritizedFuture future)
{
return Ints.compare(getPriority(), future.getPriority());
}
}
}

View File

@ -34,6 +34,11 @@ import java.util.Set;
*/
public class Queries
{
public static enum Priority
{
HIGH, NORMAL, LOW
}
public static void verifyAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs

View File

@ -0,0 +1,168 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
*/
public class PriorityExecutorServiceTest
{
private ExecutorService exec;
private CountDownLatch latch;
private CountDownLatch finishLatch;
@Before
public void setUp() throws Exception
{
exec = PriorityExecutorService.create(
new Lifecycle(),
new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 1;
}
}
);
latch = new CountDownLatch(1);
finishLatch = new CountDownLatch(3);
}
/**
* Submits a normal priority task to block the queue, followed by low, high, normal priority tasks.
* Tests to see that the high priority task is executed first, followed by the normal and low priority tasks.
*
* @throws Exception
*/
@Test
public void testSubmit() throws Exception
{
final ConcurrentLinkedQueue<Queries.Priority> order = new ConcurrentLinkedQueue<Queries.Priority>();
exec.submit(
new PrioritizedCallable<Void>()
{
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
}
@Override
public Void call() throws Exception
{
latch.await();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>()
{
@Override
public int getPriority()
{
return Queries.Priority.LOW.ordinal();
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.LOW);
finishLatch.countDown();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>()
{
@Override
public int getPriority()
{
return Queries.Priority.HIGH.ordinal();
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.HIGH);
finishLatch.countDown();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>()
{
@Override
public int getPriority()
{
return Queries.Priority.NORMAL.ordinal();
}
@Override
public Void call() throws Exception
{
order.add(Queries.Priority.NORMAL);
finishLatch.countDown();
return null;
}
}
);
latch.countDown();
finishLatch.await();
Assert.assertTrue(order.size() == 3);
List<Queries.Priority> expected = Lists.newArrayList(
Queries.Priority.HIGH,
Queries.Priority.NORMAL,
Queries.Priority.LOW
);
int i = 0;
for (Queries.Priority priority : order) {
Assert.assertEquals(expected.get(i), priority);
i++;
}
}
}

View File

@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.3</metamx.java-util.version>
<netflix.curator.version>2.0.1-21-22</netflix.curator.version>
<netflix.curator.version>2.0.2-21-22</netflix.curator.version>
</properties>
<modules>

View File

@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
@ -38,17 +37,14 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.PriorityExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
@ -101,13 +97,15 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService innerExecutorService = PriorityExecutorService.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
final ExecutorService executorService = new MetricsEmittingExecutorService(
ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
),
innerExecutorService,
emitter,
new ServiceMetricEvent.Builder()
);