From 767cb984593c57902922283dec332e708a73aa4a Mon Sep 17 00:00:00 2001 From: Ignasi Barrera Date: Mon, 16 Apr 2012 19:12:32 +0200 Subject: [PATCH] Added module to configure an ScheduledExecutorService --- core/src/main/java/org/jclouds/Constants.java | 7 + .../apis/internal/BaseApiMetadata.java | 2 + .../config/ExecutorServiceModule.java | 6 +- .../ScheduledExecutorServiceModule.java | 121 ++++++++++++ .../lifecycle/config/LifeCycleModule.java | 10 +- .../config/ExecutorServiceModuleTest.java | 8 +- .../ScheduledExecutorServiceModuleTest.java | 172 ++++++++++++++++++ 7 files changed, 318 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModule.java create mode 100644 core/src/test/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModuleTest.java diff --git a/core/src/main/java/org/jclouds/Constants.java b/core/src/main/java/org/jclouds/Constants.java index 3ec3293779..753d96a11a 100644 --- a/core/src/main/java/org/jclouds/Constants.java +++ b/core/src/main/java/org/jclouds/Constants.java @@ -41,6 +41,13 @@ public interface Constants { */ public static final String PROPERTY_IO_WORKER_THREADS = "jclouds.io-worker-threads"; + /** + * Integer property. default (10) + *

+ * Amount of threads servicing scheduled tasks. + */ + public static final String PROPERTY_SCHEDULER_THREADS = "jclouds.scheduler-threads"; + /** * Integer property. default (20) *

diff --git a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java index ac87f3641b..861948c39e 100644 --- a/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java +++ b/core/src/main/java/org/jclouds/apis/internal/BaseApiMetadata.java @@ -31,6 +31,7 @@ import static org.jclouds.Constants.PROPERTY_PRETTY_PRINT_PAYLOADS; import static org.jclouds.Constants.PROPERTY_SESSION_INTERVAL; import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; import static org.jclouds.Constants.PROPERTY_USER_THREADS; +import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS; import java.io.Closeable; import java.net.URI; @@ -71,6 +72,7 @@ public abstract class BaseApiMetadata implements ApiMetadata { props.setProperty(PROPERTY_CONNECTION_TIMEOUT, 60000 + ""); props.setProperty(PROPERTY_IO_WORKER_THREADS, 20 + ""); props.setProperty(PROPERTY_USER_THREADS, 0 + ""); + props.setProperty(PROPERTY_SCHEDULER_THREADS, 10 + ""); props.setProperty(PROPERTY_MAX_CONNECTION_REUSE, 75 + ""); props.setProperty(PROPERTY_MAX_SESSION_FAILURES, 2 + ""); props.setProperty(PROPERTY_SESSION_INTERVAL, 60 + ""); diff --git a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java index e61c7fc824..f9400a121b 100644 --- a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java +++ b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java @@ -124,7 +124,7 @@ public class ExecutorServiceModule extends AbstractModule { static class DescribingExecutorService implements ExecutorService { - private final ExecutorService delegate; + protected final ExecutorService delegate; public DescribingExecutorService(ExecutorService delegate) { this.delegate = checkNotNull(delegate, "delegate"); @@ -216,7 +216,7 @@ public class ExecutorServiceModule extends AbstractModule { } static class DescribedFuture implements Future { - private final Future delegate; + protected final Future delegate; private final String description; private StackTraceElement[] submissionTrace; @@ -340,7 +340,7 @@ public class ExecutorServiceModule extends AbstractModule { } @VisibleForTesting - static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) { + static T shutdownOnClose(final T service, Closer closer) { closer.addToClose(new ShutdownExecutorOnClose(service)); return service; } diff --git a/core/src/main/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModule.java b/core/src/main/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModule.java new file mode 100644 index 0000000000..c3d53ce56d --- /dev/null +++ b/core/src/main/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModule.java @@ -0,0 +1,121 @@ +package org.jclouds.concurrent.config; + +import static org.jclouds.concurrent.config.ExecutorServiceModule.shutdownOnClose; +import static org.jclouds.concurrent.config.ExecutorServiceModule.getStackTraceHere; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import javax.inject.Named; +import javax.inject.Singleton; + +import org.jclouds.Constants; +import org.jclouds.concurrent.config.ExecutorServiceModule.DescribedFuture; +import org.jclouds.concurrent.config.ExecutorServiceModule.DescribingExecutorService; +import org.jclouds.lifecycle.Closer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; + +/** + * Provides an {@link ScheduledExecutorService} to run periodical tasks such as virtual machine monitoring, etc. + *

+ * This module is not registered by default in the context because some providers do not allow to spawn threads. + * + * @author Ignasi Barrera + * + * @see ExecutorServiceModule + * + */ +public class ScheduledExecutorServiceModule extends AbstractModule { + + static class DescribingScheduledExecutorService extends DescribingExecutorService implements ScheduledExecutorService { + + public DescribingScheduledExecutorService(ScheduledExecutorService delegate) { + super(delegate); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return new DescribedScheduledFuture(((ScheduledExecutorService) delegate) + .schedule(command, delay, unit), command.toString(), getStackTraceHere()); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return new DescribedScheduledFuture(((ScheduledExecutorService) delegate) + .schedule(callable, delay, unit), callable.toString(), getStackTraceHere()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, + long period, TimeUnit unit) { + return new DescribedScheduledFuture(((ScheduledExecutorService) delegate) + .scheduleAtFixedRate(command, initialDelay, period, unit), command.toString(), getStackTraceHere()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, + long delay, TimeUnit unit) { + return new DescribedScheduledFuture(((ScheduledExecutorService) delegate) + .scheduleWithFixedDelay(command, initialDelay, delay, unit), command.toString(), getStackTraceHere()); + } + } + + static class DescribedScheduledFuture extends DescribedFuture implements ScheduledFuture { + + public DescribedScheduledFuture(ScheduledFuture delegate, String description, + StackTraceElement[] submissionTrace) { + super(delegate, description, submissionTrace); + } + + @Override + public long getDelay(TimeUnit unit) { + return ((ScheduledFuture) delegate).getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return ((ScheduledFuture) delegate).compareTo(o); + } + } + + static ScheduledExecutorService addToStringOnSchedule(ScheduledExecutorService executor) { + if (executor != null) { + return new DescribingScheduledExecutorService(executor); + } + return executor; + } + + @Provides + @Singleton + @Named(Constants.PROPERTY_SCHEDULER_THREADS) + ScheduledExecutorService provideScheduledExecutor(@Named(Constants.PROPERTY_SCHEDULER_THREADS) final int count, + final Closer closer) { + return shutdownOnClose(addToStringOnSchedule(newScheduledThreadPoolNamed("scheduler thread %d", count)), closer); + } + + @VisibleForTesting + static ScheduledExecutorService newScheduledThreadPoolNamed(String name, int maxCount) { + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(name) + .setThreadFactory(Executors.defaultThreadFactory()).build(); + return maxCount == 0 ? Executors.newSingleThreadScheduledExecutor(factory) + : Executors.newScheduledThreadPool(maxCount, factory); + } + + @Override + protected void configure() { + + } + +} diff --git a/core/src/main/java/org/jclouds/lifecycle/config/LifeCycleModule.java b/core/src/main/java/org/jclouds/lifecycle/config/LifeCycleModule.java index 6f5e85c7f4..5ecc35fd13 100644 --- a/core/src/main/java/org/jclouds/lifecycle/config/LifeCycleModule.java +++ b/core/src/main/java/org/jclouds/lifecycle/config/LifeCycleModule.java @@ -28,10 +28,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.inject.Inject; import javax.inject.Named; import org.jclouds.Constants; @@ -41,6 +41,7 @@ import org.jclouds.lifecycle.Closer; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ExecutionList; import com.google.inject.AbstractModule; +import com.google.inject.Inject; import com.google.inject.Stage; import com.google.inject.TypeLiteral; import com.google.inject.spi.InjectionListener; @@ -74,12 +75,19 @@ public class LifeCycleModule extends AbstractModule { @Inject @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor; + // ScheduledExecutor is defined in an optional module + @Inject(optional = true) + @Named(Constants.PROPERTY_SCHEDULER_THREADS) + ScheduledExecutorService scheduledExecutor; public void close() throws IOException { assert userExecutor != null; userExecutor.shutdownNow(); assert ioExecutor != null; ioExecutor.shutdownNow(); + // ScheduledExecutor is defined in an optional module + if (scheduledExecutor != null) + scheduledExecutor.shutdownNow(); } }; diff --git a/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java b/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java index 77784d6740..1e7a70b79f 100644 --- a/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java +++ b/core/src/test/java/org/jclouds/concurrent/config/ExecutorServiceModuleTest.java @@ -230,7 +230,7 @@ Caused by: java.lang.IllegalStateException: foo return io.submit((Runnable)t1, (Object)"shouldn't happen"); } - static void checkFutureGetFailsWith(Future task, String ...requiredPhrases) throws Exception { + static void checkFutureGetFailsWith(Future task, String ...requiredPhrases) throws Exception { try { task.get(); assert false : "task should have failed"; @@ -242,9 +242,9 @@ Caused by: java.lang.IllegalStateException: foo } } - private static class ConfigurableRunner implements Runnable, Callable { - private Object result; - private String failMessage; + static class ConfigurableRunner implements Runnable, Callable { + Object result; + String failMessage; @Override public void run() { diff --git a/core/src/test/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModuleTest.java b/core/src/test/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModuleTest.java new file mode 100644 index 0000000000..990798dbcb --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/config/ScheduledExecutorServiceModuleTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.concurrent.config; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.easymock.EasyMock; +import org.jclouds.Constants; +import org.jclouds.concurrent.config.ExecutorServiceModuleTest.ConfigurableRunner; +import org.jclouds.lifecycle.Closer; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.name.Names; + +/** + * Unit tests for the {@link ScheduledExecutorServiceModule} class. + * + * @author Ignasi Barrera + * + * @see ExecutorServiceModuleTest + */ +@Test(groups = "unit") +public class ScheduledExecutorServiceModuleTest { + + @Test(groups = "unit") + public void testShutdownOnClose() throws IOException { + Injector i = Guice.createInjector(); + + Closer closer = i.getInstance(Closer.class); + ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class); + ExecutorServiceModule.shutdownOnClose(executor, closer); + + expect(executor.shutdownNow()).andReturn(ImmutableList. of()).atLeastOnce(); + + replay(executor); + closer.close(); + + verify(executor); + } + + @Test(groups = "unit") + public void testShutdownOnCloseThroughModule() throws IOException { + + ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() { + @Override + protected void configure() { + bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1); + super.configure(); + } + }; + + Injector i = Guice.createInjector(module); + Closer closer = i.getInstance(Closer.class); + + ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names + .named(Constants.PROPERTY_SCHEDULER_THREADS))); + + assert !sched.isShutdown(); + + closer.close(); + + assert sched.isShutdown(); + } + + @Test(groups = "unit") + public void testDescribedFutureToString() throws Exception { + + ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() { + @Override + protected void configure() { + bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1); + super.configure(); + } + }; + + Injector i = Guice.createInjector(module); + Closer closer = i.getInstance(Closer.class); + + ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names + .named(Constants.PROPERTY_SCHEDULER_THREADS))); + + ConfigurableRunner t1 = new ConfigurableRunner(); + t1.result = "okay"; + + ScheduledFuture esc = performScheduleInSeparateMethod1(sched, t1); + assert esc.toString().indexOf("ConfigurableRunner") >= 0; + assert esc.get().equals("okay"); + + closer.close(); + } + + @Test(groups = "unit") + public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception { + + ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() { + @Override + protected void configure() { + bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1); + super.configure(); + } + }; + + Injector i = Guice.createInjector(module); + Closer closer = i.getInstance(Closer.class); + + ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names + .named(Constants.PROPERTY_SCHEDULER_THREADS))); + + ConfigurableRunner t1 = new ConfigurableRunner(); + t1.failMessage = "foo"; + t1.result = "shouldn't happen"; + + ScheduledFuture esc = performScheduleInSeparateMethod1(sched, t1); + checkFutureGetFailsWith(esc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod1"); + + ScheduledFuture esr = performScheduleInSeparateMethod2(sched, t1); + checkFutureGetFailsWith(esr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod2"); + + ScheduledFuture esfr = performScheduleInSeparateMethod3(sched, t1); + checkFutureGetFailsWith(esfr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod3"); + + ScheduledFuture esfd = performScheduleInSeparateMethod4(sched, t1); + checkFutureGetFailsWith(esfd, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod4"); + + closer.close(); + } + + static ScheduledFuture performScheduleInSeparateMethod1(ScheduledExecutorService sched, ConfigurableRunner t1) { + return sched.schedule((Callable)t1, 0, TimeUnit.SECONDS); + } + + static ScheduledFuture performScheduleInSeparateMethod2(ScheduledExecutorService sched, ConfigurableRunner t1) { + return sched.schedule((Runnable)t1, 0, TimeUnit.SECONDS); + } + + static ScheduledFuture performScheduleInSeparateMethod3(ScheduledExecutorService sched, ConfigurableRunner t1) { + return sched.scheduleAtFixedRate((Runnable)t1, 0, 1, TimeUnit.SECONDS); + } + + static ScheduledFuture performScheduleInSeparateMethod4(ScheduledExecutorService sched, ConfigurableRunner t1) { + return sched.scheduleWithFixedDelay((Runnable)t1, 0, 1, TimeUnit.SECONDS); + } +}