From 8b2f6909ec7df5cffb5ef417f5c9cffdee43e38a Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 18 Jan 2012 18:20:13 +0000 Subject: [PATCH] MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread pool. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232981 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 5 +- .../mapred/LocalDistributedCacheManager.java | 82 +++++++++++-------- .../apache/hadoop/mapred/LocalJobRunner.java | 8 +- 3 files changed, 58 insertions(+), 37 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d1a23439734..9f2e1ba2637 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -507,7 +507,10 @@ Release 0.23.1 - Unreleased leading to a long timeout in Task. (Rajesh Balamohan via acmurthy) MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both - delegation tokens and kerberos. (mahadev via acmurthy) + delegation tokens and kerberos. (mahadev via acmurthy) + + MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread + pool (tomwhite) Release 0.23.0 - 2011-11-01 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 14d8644e6e0..2fec8147e3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * A helper class for managing the distributed cache for {@link LocalJobRunner}. */ @@ -111,43 +114,52 @@ class LocalDistributedCacheManager { FileContext localFSFileContext = FileContext.getLocalFSFileContext(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - Map> resourcesToPaths = Maps.newHashMap(); - ExecutorService exec = Executors.newCachedThreadPool(); - Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); - for (LocalResource resource : localResources.values()) { - Callable download = new FSDownload(localFSFileContext, ugi, conf, - destPath, resource, new Random()); - Future future = exec.submit(download); - resourcesToPaths.put(resource, future); - } - for (LocalResource resource : localResources.values()) { - Path path; - try { - path = resourcesToPaths.get(resource).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e); + ExecutorService exec = null; + try { + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("LocalDistributedCacheManager Downloader #%d") + .build(); + exec = Executors.newCachedThreadPool(tf); + Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); + Map> resourcesToPaths = Maps.newHashMap(); + for (LocalResource resource : localResources.values()) { + Callable download = new FSDownload(localFSFileContext, ugi, conf, + destPath, resource, new Random()); + Future future = exec.submit(download); + resourcesToPaths.put(resource, future); } - String pathString = path.toUri().toString(); - if (resource.getType() == LocalResourceType.ARCHIVE) { - localArchives.add(pathString); - } else if (resource.getType() == LocalResourceType.FILE) { - localFiles.add(pathString); + for (LocalResource resource : localResources.values()) { + Path path; + try { + path = resourcesToPaths.get(resource).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + String pathString = path.toUri().toString(); + if (resource.getType() == LocalResourceType.ARCHIVE) { + localArchives.add(pathString); + } else if (resource.getType() == LocalResourceType.FILE) { + localFiles.add(pathString); + } + Path resourcePath; + try { + resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + LOG.info(String.format("Localized %s as %s", resourcePath, path)); + String cp = resourcePath.toUri().getPath(); + if (classpaths.keySet().contains(cp)) { + localClasspaths.add(path.toUri().getPath().toString()); + } } - Path resourcePath; - try { - resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); - } catch (URISyntaxException e) { - throw new IOException(e); + } finally { + if (exec != null) { + exec.shutdown(); } - LOG.info(String.format("Localized %s as %s", resourcePath, path)); - String cp = resourcePath.toUri().getPath(); - if (classpaths.keySet().contains(cp)) { - localClasspaths.add(path.toUri().getPath().toString()); - } - } - + } // Update the configuration object with localized data. if (!localArchives.isEmpty()) { conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils @@ -171,7 +183,7 @@ class LocalDistributedCacheManager { } setupCalled = true; } - + /** * Are the resources that should be added to the classpath? * Should be called after setup(). diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 7fe5b99aeb7..3756b98df4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -61,6 +62,8 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -302,7 +305,10 @@ public class LocalJobRunner implements ClientProtocol { LOG.debug("Map tasks to process: " + this.numMapTasks); // Create a new executor service to drain the work queue. - ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads); + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("LocalJobRunner Map Task Executor #%d") + .build(); + ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf); return executor; }