diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java new file mode 100644 index 00000000000..3bc9ed9ea92 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java @@ -0,0 +1,67 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** Helper functions for Executors. */ +public final class ExecutorHelper { + + private static final Log LOG = LogFactory + .getLog(ExecutorHelper.class); + + static void logThrowableFromAfterExecute(Runnable r, Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("afterExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + + //For additional information, see: https://docs.oracle + // .com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor + // .html#afterExecute(java.lang.Runnable,%20java.lang.Throwable) . + + if (t == null && r instanceof Future) { + try { + ((Future) r).get(); + } catch (ExecutionException ee) { + LOG.warn("Execution exception when running task in " + + Thread.currentThread().getName()); + t = ee.getCause(); + } catch (InterruptedException ie) { + LOG.warn("Thread (" + Thread.currentThread() + ") interrupted: ", ie); + Thread.currentThread().interrupt(); + } catch (Throwable throwable) { + t = throwable; + } + } + + if (t != null) { + LOG.warn("Caught exception in thread " + Thread + .currentThread().getName() + ": ", t); + } + } + + private ExecutorHelper() {} +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java new file mode 100644 index 00000000000..1bc6976b195 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java @@ -0,0 +1,96 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + +/** Factory methods for ExecutorService, ScheduledExecutorService instances. + * These executor service instances provide additional functionality (e.g + * logging uncaught exceptions). */ +public final class HadoopExecutors { + public static ExecutorService newCachedThreadPool(ThreadFactory + threadFactory) { + return new HadoopThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + + public static ExecutorService newFixedThreadPool(int nThreads, + ThreadFactory threadFactory) { + return new HadoopThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + public static ExecutorService newFixedThreadPool(int nThreads) { + return new HadoopThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + //Executors.newSingleThreadExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here. + public static ExecutorService newSingleThreadExecutor() { + return Executors.newSingleThreadExecutor(); + } + + //Executors.newSingleThreadExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here. + public static ExecutorService newSingleThreadExecutor(ThreadFactory + threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize) { + return new HadoopScheduledThreadPoolExecutor(corePoolSize); + } + + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new HadoopScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + + //Executors.newSingleThreadScheduledExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here + public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return Executors.newSingleThreadScheduledExecutor(); + } + + //Executors.newSingleThreadScheduledExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here + public static ScheduledExecutorService newSingleThreadScheduledExecutor( + ThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + //disable instantiation + private HadoopExecutors() { } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java new file mode 100644 index 00000000000..8d910b6ccab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java @@ -0,0 +1,71 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +/** An extension of ScheduledThreadPoolExecutor that provides additional + * functionality. */ +public class HadoopScheduledThreadPoolExecutor extends + ScheduledThreadPoolExecutor { + + private static final Log LOG = LogFactory + .getLog(HadoopScheduledThreadPoolExecutor.class); + + public HadoopScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + RejectedExecutionHandler handler) { + super(corePoolSize, handler); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + if (LOG.isDebugEnabled()) { + LOG.debug("beforeExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + ExecutorHelper.logThrowableFromAfterExecute(r, t); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java new file mode 100644 index 00000000000..bcf26cb17c7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java @@ -0,0 +1,92 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** An extension of ThreadPoolExecutor that provides additional functionality. + * */ +public final class HadoopThreadPoolExecutor extends ThreadPoolExecutor { + + private static final Log LOG = LogFactory + .getLog(HadoopThreadPoolExecutor.class); + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + handler); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, handler); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + if (LOG.isDebugEnabled()) { + LOG.debug("beforeExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + ExecutorHelper.logThrowableFromAfterExecute(r, t); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java new file mode 100644 index 00000000000..2effb65872e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java @@ -0,0 +1,26 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.util.concurrent; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability;