diff --git a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java index 5cc0f45d07..0413205d41 100644 --- a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java +++ b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java @@ -18,14 +18,24 @@ */ package org.jclouds.concurrent.config; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Resource; import javax.inject.Named; import javax.inject.Singleton; import org.jclouds.Constants; +import org.jclouds.lifecycle.Closer; +import org.jclouds.logging.Logger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.NamingThreadFactory; import com.google.inject.AbstractModule; import com.google.inject.Provides; @@ -39,14 +49,35 @@ import com.google.inject.Provides; */ @ConfiguresExecutorService public class ExecutorServiceModule extends AbstractModule { - private final ExecutorService userThreads; - private final ExecutorService ioThreads; + + @VisibleForTesting + static final class ShutdownExecutorOnClose implements Closeable { + @Resource + protected Logger logger = Logger.NULL; + + private final ExecutorService service; + + private ShutdownExecutorOnClose(ExecutorService service) { + this.service = service; + } + + @Override + public void close() throws IOException { + List runnables = service.shutdownNow(); + if (runnables.size() > 0) + logger.warn("when shutting down executor %s, runnables outstanding: %s", service, + runnables); + } + } + + private final ExecutorService userExecutorFromConstructor; + private final ExecutorService ioExecutorFromConstructor; public ExecutorServiceModule( @Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) { - this.userThreads = userThreads; - this.ioThreads = ioThreads; + this.userExecutorFromConstructor = userThreads; + this.ioExecutorFromConstructor = ioThreads; } public ExecutorServiceModule() { @@ -60,22 +91,43 @@ public class ExecutorServiceModule extends AbstractModule { @Provides @Singleton @Named(Constants.PROPERTY_USER_THREADS) - ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int userThreads) { - return this.userThreads != null ? this.userThreads : userThreads == 0 ? Executors - .newCachedThreadPool(new NamingThreadFactory("user thread %d")) - : newNamedThreadPool("user thread %d", userThreads); - } - - public static ExecutorService newNamedThreadPool(String name, int maxCount) { - return Executors.newFixedThreadPool(maxCount, new NamingThreadFactory(name)); + ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, + Closer closer) { + if (userExecutorFromConstructor != null) + return shutdownOnClose(userExecutorFromConstructor, closer); + return shutdownOnClose(newThreadPoolNamed("user thread %d", count), closer); } @Provides @Singleton @Named(Constants.PROPERTY_IO_WORKER_THREADS) - ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int ioThreads) { - return this.ioThreads != null ? this.ioThreads : ioThreads == 0 ? Executors - .newCachedThreadPool(new NamingThreadFactory("i/o thread %d")) : newNamedThreadPool( - "i/o thread %d", ioThreads); + ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, + Closer closer) { + if (ioExecutorFromConstructor != null) + return shutdownOnClose(ioExecutorFromConstructor, closer); + return shutdownOnClose(newThreadPoolNamed("i/o thread %d", count), closer); } + + @VisibleForTesting + static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) { + closer.addToClose(new ShutdownExecutorOnClose(service)); + return service; + } + + @VisibleForTesting + static ExecutorService newCachedThreadPoolNamed(String name) { + return Executors.newCachedThreadPool(new NamingThreadFactory(name)); + } + + @VisibleForTesting + static ExecutorService newThreadPoolNamed(String name, int count) { + return count == 0 ? newCachedThreadPoolNamed(name) : newFixedThreadPoolNamed(name, count); + } + + @VisibleForTesting + static ExecutorService newFixedThreadPoolNamed(String name, int maxCount) { + return new ThreadPoolExecutor(0, maxCount, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new NamingThreadFactory(name)); + } + } \ No newline at end of file diff --git a/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java b/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java new file mode 100644 index 0000000000..477d999578 --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java @@ -0,0 +1,55 @@ +/** + * + * Copyright (C) 2009 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ +package org.jclouds.concurrent.config; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.jclouds.lifecycle.Closer; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import com.google.inject.Guice; +import com.google.inject.Injector; + +/** + * + * @author Adrian Cole + */ +@Test +public class ExecutorServiceModuleTest { + + private Closer closer; + + @BeforeTest + public void setUp() throws Exception { + Injector i = Guice.createInjector(); + closer = i.getInstance(Closer.class); + } + + @Test + public void testShutdownOnClose() throws IOException { + ExecutorService executor = Executors.newCachedThreadPool(); + assert !executor.isShutdown(); + ExecutorServiceModule.shutdownOnClose(executor, closer); + closer.close(); + assert executor.isShutdown(); + } +} diff --git a/core/src/test/java/org/jclouds/lifecycle/config/LifeCycleModuleTest.java b/core/src/test/java/org/jclouds/lifecycle/config/LifeCycleModuleTest.java index 780200ba3c..7f74bfe9ea 100644 --- a/core/src/test/java/org/jclouds/lifecycle/config/LifeCycleModuleTest.java +++ b/core/src/test/java/org/jclouds/lifecycle/config/LifeCycleModuleTest.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.inject.Inject; import javax.inject.Named; import org.jclouds.Constants; @@ -39,7 +37,6 @@ import com.google.inject.Key; import com.google.inject.Provides; /** - * // TODO: Adrian: Document this! * * @author Adrian Cole */ @@ -98,45 +95,17 @@ public class LifeCycleModuleTest { assert executor.isShutdown(); } - static class PreDestroyable { - boolean isClosed = false; - private final ExecutorService userThreads; - private final ExecutorService ioThreads; - - @Inject - PreDestroyable(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, - @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) { - this.userThreads = userThreads; - this.ioThreads = ioThreads; - } - - @PreDestroy - public void close() { - assert !userThreads.isShutdown(); - assert !ioThreads.isShutdown(); - - isClosed = true; - } - } - @Test void testCloserPreDestroyOrder() throws IOException { - Injector i = createInjector().createChildInjector(new AbstractModule() { - protected void configure() { - bind(PreDestroyable.class); - } - }); + Injector i = createInjector(); ExecutorService userThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330 .named(Constants.PROPERTY_USER_THREADS))); assert !userThreads.isShutdown(); ExecutorService ioThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330 .named(Constants.PROPERTY_IO_WORKER_THREADS))); assert !ioThreads.isShutdown(); - PreDestroyable preDestroyable = i.getInstance(PreDestroyable.class); - assert !preDestroyable.isClosed; Closer closer = i.getInstance(Closer.class); closer.close(); - assert preDestroyable.isClosed; assert userThreads.isShutdown(); assert ioThreads.isShutdown(); }