fix broken priority queries

This commit is contained in:
fjy 2014-02-14 15:05:30 -08:00
parent 3979eb270c
commit ef648b85f5
5 changed files with 31 additions and 6 deletions

View File

@ -22,7 +22,9 @@ package io.druid.query;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
public class MetricsEmittingExecutorService extends DelegatingExecutorService public class MetricsEmittingExecutorService extends DelegatingExecutorService
@ -44,6 +46,14 @@ public class MetricsEmittingExecutorService extends DelegatingExecutorService
this.metricBuilder = metricBuilder; this.metricBuilder = metricBuilder;
} }
@Override
public <T> Future<T> submit(Callable<T> tCallable)
{
emitMetrics();
return base.submit(tCallable);
}
@Override @Override
public void execute(Runnable runnable) public void execute(Runnable runnable)
{ {
@ -53,8 +63,8 @@ public class MetricsEmittingExecutorService extends DelegatingExecutorService
private void emitMetrics() private void emitMetrics()
{ {
if (base instanceof ThreadPoolExecutor) { if (base instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("exec/backlog", ((ThreadPoolExecutor) base).getQueue().size())); emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) base).getQueueSize()));
} }
} }
} }

View File

@ -68,6 +68,7 @@ public class PrioritizedExecutorService extends AbstractExecutorService
return service; return service;
} }
private static final int DEFAULT_PRIORITY = 0; private static final int DEFAULT_PRIORITY = 0;
@ -133,6 +134,11 @@ public class PrioritizedExecutorService extends AbstractExecutorService
return new PrioritizedFuture<T>((PrioritizedCallable) theCallable); return new PrioritizedFuture<T>((PrioritizedCallable) theCallable);
} }
public int getQueueSize()
{
return threadPoolExecutor.getQueue().size();
}
private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture> private static class PrioritizedFuture<V> extends FutureTask<V> implements Comparable<PrioritizedFuture>
{ {
private final PrioritizedCallable<V> callable; private final PrioritizedCallable<V> callable;

Binary file not shown.

View File

@ -206,7 +206,7 @@ for queries on events that exist in this JVM heap-based buffer. To avoid heap ov
problems, real-time nodes persist their in-memory indexes to disk either problems, real-time nodes persist their in-memory indexes to disk either
periodically or after some maximum row limit is reached. This persist process periodically or after some maximum row limit is reached. This persist process
converts data stored in the in-memory buffer to a column oriented storage converts data stored in the in-memory buffer to a column oriented storage
format described in \ref{sec:storage-format}. Each persisted index is immutable and format described in Section \ref{sec:storage-format}. Each persisted index is immutable and
real-time nodes load persisted indexes into off-heap memory such that they can real-time nodes load persisted indexes into off-heap memory such that they can
still be queried. Figure~\ref{fig:realtime_flow} illustrates the process. still be queried. Figure~\ref{fig:realtime_flow} illustrates the process.
@ -533,7 +533,7 @@ contain strings. Storing strings directly is unnecessarily costly and string
columns can be dictionary encoded instead. Dictionary encoding is a common columns can be dictionary encoded instead. Dictionary encoding is a common
method to compress data and has been used in other data stores such as method to compress data and has been used in other data stores such as
PowerDrill \cite{hall2012processing}. In the example in PowerDrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each publisher to an unique integer Table~\ref{tab:sample_data}, we can map each page to an unique integer
identifier. identifier.
\begin{verbatim} \begin{verbatim}
Justin Bieber -> 0 Justin Bieber -> 0

View File

@ -26,6 +26,7 @@ import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.ProvisionException; import com.google.inject.ProvisionException;
import com.metamx.common.concurrent.ExecutorServiceConfig; import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
@ -34,6 +35,7 @@ import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingExecutorService; import io.druid.query.MetricsEmittingExecutorService;
import io.druid.query.PrioritizedExecutorService;
import io.druid.server.DruidProcessingConfig; import io.druid.server.DruidProcessingConfig;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -58,10 +60,17 @@ public class DruidProcessingModule implements Module
@Provides @Provides
@Processing @Processing
@ManageLifecycle @ManageLifecycle
public ExecutorService getProcessingExecutorService(ExecutorServiceConfig config, ServiceEmitter emitter) public ExecutorService getProcessingExecutorService(
ExecutorServiceConfig config,
ServiceEmitter emitter,
Lifecycle lifecycle
)
{ {
return new MetricsEmittingExecutorService( return new MetricsEmittingExecutorService(
Executors.newFixedThreadPool(config.getNumThreads(), Execs.makeThreadFactory(config.getFormatString())), PrioritizedExecutorService.create(
lifecycle,
config
),
emitter, emitter,
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
); );