Added module to configure an ScheduledExecutorService

This commit is contained in:
Ignasi Barrera 2012-04-16 19:12:32 +02:00
parent cedd906bdb
commit 767cb98459
7 changed files with 318 additions and 8 deletions

View File

@ -41,6 +41,13 @@ public interface Constants {
*/ */
public static final String PROPERTY_IO_WORKER_THREADS = "jclouds.io-worker-threads"; public static final String PROPERTY_IO_WORKER_THREADS = "jclouds.io-worker-threads";
/**
* Integer property. default (10)
* <p/>
* Amount of threads servicing scheduled tasks.
*/
public static final String PROPERTY_SCHEDULER_THREADS = "jclouds.scheduler-threads";
/** /**
* Integer property. default (20) * Integer property. default (20)
* <p/> * <p/>

View File

@ -31,6 +31,7 @@ import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS;
import static org.jclouds.Constants.PROPERTY_SESSION_INTERVAL; import static org.jclouds.Constants.PROPERTY_SESSION_INTERVAL;
import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT;
import static org.jclouds.Constants.PROPERTY_USER_THREADS; import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
import java.io.Closeable; import java.io.Closeable;
import java.net.URI; import java.net.URI;
@ -71,6 +72,7 @@ public abstract class BaseApiMetadata implements ApiMetadata {
props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + ""); props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + "");
props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + ""); props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + "");
props.setProperty(PROPERTY_USER_THREADS, 0 + ""); props.setProperty(PROPERTY_USER_THREADS, 0 + "");
props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + "");
props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + ""); props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + "");
props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + ""); props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + "");
props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + ""); props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + "");

View File

@ -124,7 +124,7 @@ public class ExecutorServiceModule extends AbstractModule {
static class DescribingExecutorService implements ExecutorService { static class DescribingExecutorService implements ExecutorService {
private final ExecutorService delegate; protected final ExecutorService delegate;
public DescribingExecutorService(ExecutorService delegate) { public DescribingExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
@ -216,7 +216,7 @@ public class ExecutorServiceModule extends AbstractModule {
} }
static class DescribedFuture<T> implements Future<T> { static class DescribedFuture<T> implements Future<T> {
private final Future<T> delegate; protected final Future<T> delegate;
private final String description; private final String description;
private StackTraceElement[] submissionTrace; private StackTraceElement[] submissionTrace;
@ -340,7 +340,7 @@ public class ExecutorServiceModule extends AbstractModule {
} }
@VisibleForTesting @VisibleForTesting
static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) { static <T extends ExecutorService> T shutdownOnClose(final T service, Closer closer) {
closer.addToClose(new ShutdownExecutorOnClose(service)); closer.addToClose(new ShutdownExecutorOnClose(service));
return service; return service;
} }

View File

@ -0,0 +1,121 @@
package org.jclouds.concurrent.config;
import static org.jclouds.concurrent.config.ExecutorServiceModule.shutdownOnClose;
import static org.jclouds.concurrent.config.ExecutorServiceModule.getStackTraceHere;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.concurrent.config.ExecutorServiceModule.DescribedFuture;
import org.jclouds.concurrent.config.ExecutorServiceModule.DescribingExecutorService;
import org.jclouds.lifecycle.Closer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
/**
* Provides an {@link ScheduledExecutorService} to run periodical tasks such as virtual machine monitoring, etc.
* <p>
* This module is not registered by default in the context because some providers do not allow to spawn threads.
*
* @author Ignasi Barrera
*
* @see ExecutorServiceModule
*
*/
public class ScheduledExecutorServiceModule extends AbstractModule {
static class DescribingScheduledExecutorService extends DescribingExecutorService implements ScheduledExecutorService {
public DescribingScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate)
.schedule(command, delay, unit), command.toString(), getStackTraceHere());
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return new DescribedScheduledFuture<V>(((ScheduledExecutorService) delegate)
.schedule(callable, delay, unit), callable.toString(), getStackTraceHere());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate)
.scheduleAtFixedRate(command, initialDelay, period, unit), command.toString(), getStackTraceHere());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate)
.scheduleWithFixedDelay(command, initialDelay, delay, unit), command.toString(), getStackTraceHere());
}
}
static class DescribedScheduledFuture<T> extends DescribedFuture<T> implements ScheduledFuture<T> {
public DescribedScheduledFuture(ScheduledFuture<T> delegate, String description,
StackTraceElement[] submissionTrace) {
super(delegate, description, submissionTrace);
}
@Override
public long getDelay(TimeUnit unit) {
return ((ScheduledFuture<T>) delegate).getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return ((ScheduledFuture<T>) delegate).compareTo(o);
}
}
static ScheduledExecutorService addToStringOnSchedule(ScheduledExecutorService executor) {
if (executor != null) {
return new DescribingScheduledExecutorService(executor);
}
return executor;
}
@Provides
@Singleton
@Named(Constants.PROPERTY_SCHEDULER_THREADS)
ScheduledExecutorService provideScheduledExecutor(@Named(Constants.PROPERTY_SCHEDULER_THREADS) final int count,
final Closer closer) {
return shutdownOnClose(addToStringOnSchedule(newScheduledThreadPoolNamed("scheduler thread %d", count)), closer);
}
@VisibleForTesting
static ScheduledExecutorService newScheduledThreadPoolNamed(String name, int maxCount) {
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(name)
.setThreadFactory(Executors.defaultThreadFactory()).build();
return maxCount == 0 ? Executors.newSingleThreadScheduledExecutor(factory)
: Executors.newScheduledThreadPool(maxCount, factory);
}
@Override
protected void configure() {
}
}

View File

@ -28,10 +28,10 @@ import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.jclouds.Constants; import org.jclouds.Constants;
@ -41,6 +41,7 @@ import org.jclouds.lifecycle.Closer;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ExecutionList;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Stage; import com.google.inject.Stage;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.spi.InjectionListener; import com.google.inject.spi.InjectionListener;
@ -74,12 +75,19 @@ public class LifeCycleModule extends AbstractModule {
@Inject @Inject
@Named(Constants.PROPERTY_IO_WORKER_THREADS) @Named(Constants.PROPERTY_IO_WORKER_THREADS)
ExecutorService ioExecutor; ExecutorService ioExecutor;
// ScheduledExecutor is defined in an optional module
@Inject(optional = true)
@Named(Constants.PROPERTY_SCHEDULER_THREADS)
ScheduledExecutorService scheduledExecutor;
public void close() throws IOException { public void close() throws IOException {
assert userExecutor != null; assert userExecutor != null;
userExecutor.shutdownNow(); userExecutor.shutdownNow();
assert ioExecutor != null; assert ioExecutor != null;
ioExecutor.shutdownNow(); ioExecutor.shutdownNow();
// ScheduledExecutor is defined in an optional module
if (scheduledExecutor != null)
scheduledExecutor.shutdownNow();
} }
}; };

View File

@ -230,7 +230,7 @@ Caused by: java.lang.IllegalStateException: foo
return io.submit((Runnable)t1, (Object)"shouldn't happen"); return io.submit((Runnable)t1, (Object)"shouldn't happen");
} }
static void checkFutureGetFailsWith(Future<Object> task, String ...requiredPhrases) throws Exception { static void checkFutureGetFailsWith(Future<?> task, String ...requiredPhrases) throws Exception {
try { try {
task.get(); task.get();
assert false : "task should have failed"; assert false : "task should have failed";
@ -242,9 +242,9 @@ Caused by: java.lang.IllegalStateException: foo
} }
} }
private static class ConfigurableRunner implements Runnable, Callable<Object> { static class ConfigurableRunner implements Runnable, Callable<Object> {
private Object result; Object result;
private String failMessage; String failMessage;
@Override @Override
public void run() { public void run() {

View File

@ -0,0 +1,172 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.concurrent.config;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.easymock.EasyMock;
import org.jclouds.Constants;
import org.jclouds.concurrent.config.ExecutorServiceModuleTest.ConfigurableRunner;
import org.jclouds.lifecycle.Closer;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
/**
* Unit tests for the {@link ScheduledExecutorServiceModule} class.
*
* @author Ignasi Barrera
*
* @see ExecutorServiceModuleTest
*/
@Test(groups = "unit")
public class ScheduledExecutorServiceModuleTest {
@Test(groups = "unit")
public void testShutdownOnClose() throws IOException {
Injector i = Guice.createInjector();
Closer closer = i.getInstance(Closer.class);
ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
ExecutorServiceModule.shutdownOnClose(executor, closer);
expect(executor.shutdownNow()).andReturn(ImmutableList.<Runnable> of()).atLeastOnce();
replay(executor);
closer.close();
verify(executor);
}
@Test(groups = "unit")
public void testShutdownOnCloseThroughModule() throws IOException {
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
Closer closer = i.getInstance(Closer.class);
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
assert !sched.isShutdown();
closer.close();
assert sched.isShutdown();
}
@Test(groups = "unit")
public void testDescribedFutureToString() throws Exception {
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
Closer closer = i.getInstance(Closer.class);
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
ConfigurableRunner t1 = new ConfigurableRunner();
t1.result = "okay";
ScheduledFuture<Object> esc = performScheduleInSeparateMethod1(sched, t1);
assert esc.toString().indexOf("ConfigurableRunner") >= 0;
assert esc.get().equals("okay");
closer.close();
}
@Test(groups = "unit")
public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception {
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
Closer closer = i.getInstance(Closer.class);
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
ConfigurableRunner t1 = new ConfigurableRunner();
t1.failMessage = "foo";
t1.result = "shouldn't happen";
ScheduledFuture<Object> esc = performScheduleInSeparateMethod1(sched, t1);
checkFutureGetFailsWith(esc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod1");
ScheduledFuture<?> esr = performScheduleInSeparateMethod2(sched, t1);
checkFutureGetFailsWith(esr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod2");
ScheduledFuture<?> esfr = performScheduleInSeparateMethod3(sched, t1);
checkFutureGetFailsWith(esfr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod3");
ScheduledFuture<?> esfd = performScheduleInSeparateMethod4(sched, t1);
checkFutureGetFailsWith(esfd, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod4");
closer.close();
}
static ScheduledFuture<Object> performScheduleInSeparateMethod1(ScheduledExecutorService sched, ConfigurableRunner t1) {
return sched.schedule((Callable<Object>)t1, 0, TimeUnit.SECONDS);
}
static ScheduledFuture<?> performScheduleInSeparateMethod2(ScheduledExecutorService sched, ConfigurableRunner t1) {
return sched.schedule((Runnable)t1, 0, TimeUnit.SECONDS);
}
static ScheduledFuture<?> performScheduleInSeparateMethod3(ScheduledExecutorService sched, ConfigurableRunner t1) {
return sched.scheduleAtFixedRate((Runnable)t1, 0, 1, TimeUnit.SECONDS);
}
static ScheduledFuture<?> performScheduleInSeparateMethod4(ScheduledExecutorService sched, ConfigurableRunner t1) {
return sched.scheduleWithFixedDelay((Runnable)t1, 0, 1, TimeUnit.SECONDS);
}
}