make MetricsEmittingExecutorService listenable

This commit is contained in:
Xavier Léauté 2014-05-09 22:42:45 -07:00
parent ea318f01ee
commit 4280b4e8fa
1 changed files with 19 additions and 13 deletions

View File

@ -19,51 +19,57 @@
package io.druid.query;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class MetricsEmittingExecutorService extends DelegatingExecutorService
public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService
{
private final ExecutorService base;
private final ListeningExecutorService delegate;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;
public MetricsEmittingExecutorService(
ExecutorService base,
ListeningExecutorService delegate,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder
)
{
super(base);
super();
this.base = base;
this.delegate = delegate;
this.emitter = emitter;
this.metricBuilder = metricBuilder;
}
@Override
public <T> Future<T> submit(Callable<T> tCallable)
protected ListeningExecutorService delegate()
{
return delegate;
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> tCallable)
{
emitMetrics();
return base.submit(tCallable);
return delegate.submit(tCallable);
}
@Override
public void execute(Runnable runnable)
{
emitMetrics();
base.execute(runnable);
delegate.execute(runnable);
}
private void emitMetrics()
{
if (base instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) base).getQueueSize()));
if (delegate instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) delegate).getQueueSize()));
}
}
}