fix where executor service doesn't close when context does

This commit is contained in:
Adrian Cole 2010-02-16 08:28:56 -08:00
parent 19aed31158
commit 041ef28bcb
3 changed files with 124 additions and 48 deletions

View File

@ -18,14 +18,24 @@
*/ */
package org.jclouds.concurrent.config; package org.jclouds.concurrent.config;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import javax.inject.Named; import javax.inject.Named;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.jclouds.Constants; import org.jclouds.Constants;
import org.jclouds.lifecycle.Closer;
import org.jclouds.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.NamingThreadFactory; import com.google.common.util.concurrent.NamingThreadFactory;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
@ -39,14 +49,35 @@ import com.google.inject.Provides;
*/ */
@ConfiguresExecutorService @ConfiguresExecutorService
public class ExecutorServiceModule extends AbstractModule { public class ExecutorServiceModule extends AbstractModule {
private final ExecutorService userThreads;
private final ExecutorService ioThreads; @VisibleForTesting
static final class ShutdownExecutorOnClose implements Closeable {
@Resource
protected Logger logger = Logger.NULL;
private final ExecutorService service;
private ShutdownExecutorOnClose(ExecutorService service) {
this.service = service;
}
@Override
public void close() throws IOException {
List<Runnable> runnables = service.shutdownNow();
if (runnables.size() > 0)
logger.warn("when shutting down executor %s, runnables outstanding: %s", service,
runnables);
}
}
private final ExecutorService userExecutorFromConstructor;
private final ExecutorService ioExecutorFromConstructor;
public ExecutorServiceModule( public ExecutorServiceModule(
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) { @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
this.userThreads = userThreads; this.userExecutorFromConstructor = userThreads;
this.ioThreads = ioThreads; this.ioExecutorFromConstructor = ioThreads;
} }
public ExecutorServiceModule() { public ExecutorServiceModule() {
@ -60,22 +91,43 @@ public class ExecutorServiceModule extends AbstractModule {
@Provides @Provides
@Singleton @Singleton
@Named(Constants.PROPERTY_USER_THREADS) @Named(Constants.PROPERTY_USER_THREADS)
ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int userThreads) { ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count,
return this.userThreads != null ? this.userThreads : userThreads == 0 ? Executors Closer closer) {
.newCachedThreadPool(new NamingThreadFactory("user thread %d")) if (userExecutorFromConstructor != null)
: newNamedThreadPool("user thread %d", userThreads); return shutdownOnClose(userExecutorFromConstructor, closer);
} return shutdownOnClose(newThreadPoolNamed("user thread %d", count), closer);
public static ExecutorService newNamedThreadPool(String name, int maxCount) {
return Executors.newFixedThreadPool(maxCount, new NamingThreadFactory(name));
} }
@Provides @Provides
@Singleton @Singleton
@Named(Constants.PROPERTY_IO_WORKER_THREADS) @Named(Constants.PROPERTY_IO_WORKER_THREADS)
ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int ioThreads) { ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count,
return this.ioThreads != null ? this.ioThreads : ioThreads == 0 ? Executors Closer closer) {
.newCachedThreadPool(new NamingThreadFactory("i/o thread %d")) : newNamedThreadPool( if (ioExecutorFromConstructor != null)
"i/o thread %d", ioThreads); return shutdownOnClose(ioExecutorFromConstructor, closer);
return shutdownOnClose(newThreadPoolNamed("i/o thread %d", count), closer);
} }
@VisibleForTesting
static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
closer.addToClose(new ShutdownExecutorOnClose(service));
return service;
}
@VisibleForTesting
static ExecutorService newCachedThreadPoolNamed(String name) {
return Executors.newCachedThreadPool(new NamingThreadFactory(name));
}
@VisibleForTesting
static ExecutorService newThreadPoolNamed(String name, int count) {
return count == 0 ? newCachedThreadPoolNamed(name) : newFixedThreadPoolNamed(name, count);
}
@VisibleForTesting
static ExecutorService newFixedThreadPoolNamed(String name, int maxCount) {
return new ThreadPoolExecutor(0, maxCount, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name));
}
} }

View File

@ -0,0 +1,55 @@
/**
*
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.concurrent.config;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jclouds.lifecycle.Closer;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import com.google.inject.Guice;
import com.google.inject.Injector;
/**
*
* @author Adrian Cole
*/
@Test
public class ExecutorServiceModuleTest {
private Closer closer;
@BeforeTest
public void setUp() throws Exception {
Injector i = Guice.createInjector();
closer = i.getInstance(Closer.class);
}
@Test
public void testShutdownOnClose() throws IOException {
ExecutorService executor = Executors.newCachedThreadPool();
assert !executor.isShutdown();
ExecutorServiceModule.shutdownOnClose(executor, closer);
closer.close();
assert executor.isShutdown();
}
}

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.jclouds.Constants; import org.jclouds.Constants;
@ -39,7 +37,6 @@ import com.google.inject.Key;
import com.google.inject.Provides; import com.google.inject.Provides;
/** /**
* // TODO: Adrian: Document this!
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
@ -98,45 +95,17 @@ public class LifeCycleModuleTest {
assert executor.isShutdown(); assert executor.isShutdown();
} }
static class PreDestroyable {
boolean isClosed = false;
private final ExecutorService userThreads;
private final ExecutorService ioThreads;
@Inject
PreDestroyable(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
this.userThreads = userThreads;
this.ioThreads = ioThreads;
}
@PreDestroy
public void close() {
assert !userThreads.isShutdown();
assert !ioThreads.isShutdown();
isClosed = true;
}
}
@Test @Test
void testCloserPreDestroyOrder() throws IOException { void testCloserPreDestroyOrder() throws IOException {
Injector i = createInjector().createChildInjector(new AbstractModule() { Injector i = createInjector();
protected void configure() {
bind(PreDestroyable.class);
}
});
ExecutorService userThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330 ExecutorService userThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330
.named(Constants.PROPERTY_USER_THREADS))); .named(Constants.PROPERTY_USER_THREADS)));
assert !userThreads.isShutdown(); assert !userThreads.isShutdown();
ExecutorService ioThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330 ExecutorService ioThreads = i.getInstance(Key.get(ExecutorService.class, Jsr330
.named(Constants.PROPERTY_IO_WORKER_THREADS))); .named(Constants.PROPERTY_IO_WORKER_THREADS)));
assert !ioThreads.isShutdown(); assert !ioThreads.isShutdown();
PreDestroyable preDestroyable = i.getInstance(PreDestroyable.class);
assert !preDestroyable.isClosed;
Closer closer = i.getInstance(Closer.class); Closer closer = i.getInstance(Closer.class);
closer.close(); closer.close();
assert preDestroyable.isClosed;
assert userThreads.isShutdown(); assert userThreads.isShutdown();
assert ioThreads.isShutdown(); assert ioThreads.isShutdown();
} }