Merge pull request #156 from metamx/query-prioritization

Query prioritization
This commit is contained in:
fjy 2013-06-06 16:05:47 -07:00
commit f1dea971f2
6 changed files with 355 additions and 17 deletions

View File

@ -49,6 +49,7 @@ import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.Queries;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.QueryToolChestWarehouse;
@ -125,12 +126,18 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final Query<T> rewrittenQuery;
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<String, String>();
final String priority = query.getContextValue("priority", "0");
contextBuilder.put("priority", priority);
if (populateCache) {
rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true"));
} else {
rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
contextBuilder.put("bySegment", "true");
}
contextBuilder.put("intermediate", "true");
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {

View File

@ -34,7 +34,6 @@ import com.metamx.druid.Query;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -83,6 +82,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@ -99,7 +100,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Future<List<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new Callable<List<T>>()
new PrioritizedCallable<List<T>>(priority)
{
@Override
public List<T> call() throws Exception

View File

@ -0,0 +1,39 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import java.util.concurrent.Callable;
/**
*/
public abstract class PrioritizedCallable<T> implements Callable<T>
{
final int priority;
public PrioritizedCallable(int priority)
{
this.priority = priority;
}
public int getPriority()
{
return priority;
}
}

View File

@ -0,0 +1,158 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
public class PrioritizedExecutorService extends AbstractExecutorService
{
public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
{
final ExecutorService service = new PrioritizedExecutorService(
new ThreadPoolExecutor(
config.getNumThreads(),
config.getNumThreads(),
0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
)
);
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
}
@Override
public void stop()
{
service.shutdownNow();
}
}
);
return service;
}
private static final int DEFAULT_PRIORITY = 0;
private final ThreadPoolExecutor threadPoolExecutor;
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor
)
{
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public void shutdown()
{
threadPoolExecutor.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return threadPoolExecutor.shutdownNow();
}
@Override
public boolean isShutdown()
{
return threadPoolExecutor.isShutdown();
}
@Override
public boolean isTerminated()
{
return threadPoolExecutor.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return threadPoolExecutor.awaitTermination(l, timeUnit);
}
@Override
public void execute(Runnable runnable)
{
threadPoolExecutor.execute(runnable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> tCallable)
{
Callable<T> theCallable = tCallable;
if (!(tCallable instanceof PrioritizedCallable)) {
theCallable = new PrioritizedCallable<T>(DEFAULT_PRIORITY)
{
@Override
public T call() throws Exception
{
return tCallable.call();
}
};
}
return new PrioritizedFuture<T>((PrioritizedCallable) theCallable);
}
private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture>
{
private final PrioritizedCallable<V> callable;
public PrioritizedFuture(PrioritizedCallable<V> callable)
{
super(callable);
this.callable = callable;
}
public int getPriority()
{
return callable.getPriority();
}
@Override
public int compareTo(PrioritizedFuture future)
{
return -Ints.compare(getPriority(), future.getPriority());
}
}
}

View File

@ -0,0 +1,135 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query;
import com.google.common.collect.ImmutableList;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
*/
public class PrioritizedExecutorServiceTest
{
private ExecutorService exec;
private CountDownLatch latch;
private CountDownLatch finishLatch;
@Before
public void setUp() throws Exception
{
exec = PrioritizedExecutorService.create(
new Lifecycle(),
new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 1;
}
}
);
latch = new CountDownLatch(1);
finishLatch = new CountDownLatch(3);
}
/**
* Submits a normal priority task to block the queue, followed by low, high, normal priority tasks.
* Tests to see that the high priority task is executed first, followed by the normal and low priority tasks.
*
* @throws Exception
*/
@Test
public void testSubmit() throws Exception
{
final ConcurrentLinkedQueue<Integer> order = new ConcurrentLinkedQueue<Integer>();
exec.submit(
new PrioritizedCallable<Void>(0)
{
@Override
public Void call() throws Exception
{
latch.await();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>(-1)
{
@Override
public Void call() throws Exception
{
order.add(-1);
finishLatch.countDown();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>(0)
{
@Override
public Void call() throws Exception
{
order.add(0);
finishLatch.countDown();
return null;
}
}
);
exec.submit(
new PrioritizedCallable<Void>(2)
{
@Override
public Void call() throws Exception
{
order.add(2);
finishLatch.countDown();
return null;
}
}
);
latch.countDown();
finishLatch.await();
Assert.assertTrue(order.size() == 3);
List<Integer> expected = ImmutableList.of(2, 0, -1);
Assert.assertEquals(expected, ImmutableList.copyOf(order));
}
}

View File

@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
@ -38,17 +37,14 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.PrioritizedExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
@ -101,13 +97,15 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService innerExecutorService = PrioritizedExecutorService.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
final ExecutorService executorService = new MetricsEmittingExecutorService(
ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
),
innerExecutorService,
emitter,
new ServiceMetricEvent.Builder()
);