From 37a39940034422c7cc1d4eeaa6934c9d9933ed74 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 7 Jun 2013 14:08:51 -0700 Subject: [PATCH] add delegating executor service and fix bug with query priortization --- .../query/DelegatingExecutorService.java | 127 ++++++++++++++++++ .../query/MetricsEmittingExecutorService.java | 59 ++++---- .../query/PrioritizedExecutorService.java | 4 +- .../worker/WorkerCuratorCoordinator.java | 3 - .../com/metamx/druid/http/ComputeNode.java | 2 +- 5 files changed, 158 insertions(+), 37 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java diff --git a/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java new file mode 100644 index 00000000000..f9d377694e8 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java @@ -0,0 +1,127 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + */ +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService(ExecutorService delegate) + { + this.delegate = delegate; + } + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException + { + return delegate.awaitTermination(l, timeUnit); + } + + @Override + public Future submit(Callable tCallable) + { + return delegate.submit(tCallable); + } + + @Override + public Future submit(Runnable runnable, T t) + { + return delegate.submit(runnable, t); + } + + @Override + public Future submit(Runnable runnable) + { + return delegate.submit(runnable); + } + + @Override + public List> invokeAll(Collection> callables) throws InterruptedException + { + return delegate.invokeAll(callables); + } + + @Override + public List> invokeAll( + Collection> callables, + long l, + TimeUnit timeUnit + ) throws InterruptedException + { + return delegate.invokeAll(callables, l, timeUnit); + } + + @Override + public T invokeAny(Collection> callables) throws InterruptedException, ExecutionException + { + return delegate.invokeAny(callables); + } + + @Override + public T invokeAny( + Collection> callables, + long l, + TimeUnit timeUnit + ) throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(callables, l, timeUnit); + } + + @Override + public void execute(Runnable runnable) + { + delegate.execute(runnable); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java index 2e453ddbe0b..f109d2f783a 100644 --- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java @@ -1,15 +1,40 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.query; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import java.util.Collection; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -public class MetricsEmittingExecutorService extends AbstractExecutorService +public class MetricsEmittingExecutorService extends DelegatingExecutorService { private final ExecutorService base; private final ServiceEmitter emitter; @@ -21,41 +46,13 @@ public class MetricsEmittingExecutorService extends AbstractExecutorService ServiceMetricEvent.Builder metricBuilder ) { + super(base); + this.base = base; this.emitter = emitter; this.metricBuilder = metricBuilder; } - @Override - public void shutdown() - { - base.shutdown(); - } - - @Override - public List shutdownNow() - { - return base.shutdownNow(); - } - - @Override - public boolean isShutdown() - { - return base.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return base.isTerminated(); - } - - @Override - public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException - { - return base.awaitTermination(l, timeUnit); - } - @Override public void execute(Runnable runnable) { diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java index 5e13184de8b..f943a0c112f 100644 --- a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java @@ -38,9 +38,9 @@ import java.util.concurrent.TimeUnit; */ public class PrioritizedExecutorService extends AbstractExecutorService { - public static ExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) + public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config) { - final ExecutorService service = new PrioritizedExecutorService( + final PrioritizedExecutorService service = new PrioritizedExecutorService( new ThreadPoolExecutor( config.getNumThreads(), config.getNumThreads(), diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java index 44b308165b8..77e70f52a5f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -32,10 +32,7 @@ import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.config.IndexerZkConfig; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.zookeeper.CreateMode; - import org.joda.time.DateTime; import java.util.Arrays; diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 44abd781931..7cd0990d392 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -97,7 +97,7 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); - final ExecutorService innerExecutorService = PrioritizedExecutorService.create( + final PrioritizedExecutorService innerExecutorService = PrioritizedExecutorService.create( getLifecycle(), getConfigFactory().buildWithReplacements( ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")