diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a5dfe24ff26..2f777a535d3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -493,6 +493,9 @@ Release 2.7.0 - UNRELEASED YARN-2971. RM uses conf instead of token service address to renew timeline delegation tokens (jeagles) + YARN-3090. DeletionService can silently ignore deletion task failures + (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index e4025f5da1d..4e00a1cc265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -29,6 +29,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -113,13 +115,13 @@ public class DeletionService extends AbstractService { .setNameFormat("DeletionService #%d") .build(); if (conf != null) { - sched = new ScheduledThreadPoolExecutor( - conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), - tf); + sched = new DelServiceSchedThreadPoolExecutor( + conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { - sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, - tf); + sched = new DelServiceSchedThreadPoolExecutor( + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); @@ -155,6 +157,34 @@ public class DeletionService extends AbstractService { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } + private static class DelServiceSchedThreadPoolExecutor extends + ScheduledThreadPoolExecutor { + public DelServiceSchedThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + @Override + protected void afterExecute(Runnable task, Throwable exception) { + if (task instanceof FutureTask) { + FutureTask futureTask = (FutureTask) task; + if (!futureTask.isCancelled()) { + try { + futureTask.get(); + } catch (ExecutionException ee) { + exception = ee.getCause(); + } catch (InterruptedException ie) { + exception = ie; + } + } + } + if (exception != null) { + LOG.error("Exception during execution of task in DeletionService", + exception); + } + } + } + public static class FileDeletionTask implements Runnable { public static final int INVALID_TASK_ID = -1; private int taskId;