Merge -r 1232980:1232981 from trunk to branch-0.23. Fixes: MAPREDUCE-3684
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1232982 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f2f884a34
commit
76ef392a2e
|
@ -455,6 +455,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both
|
MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both
|
||||||
delegation tokens and kerberos. (mahadev via acmurthy)
|
delegation tokens and kerberos. (mahadev via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread
|
||||||
|
pool (tomwhite)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.FSDownload;
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A helper class for managing the distributed cache for {@link LocalJobRunner}.
|
* A helper class for managing the distributed cache for {@link LocalJobRunner}.
|
||||||
*/
|
*/
|
||||||
|
@ -111,43 +114,52 @@ class LocalDistributedCacheManager {
|
||||||
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
|
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
ExecutorService exec = null;
|
||||||
ExecutorService exec = Executors.newCachedThreadPool();
|
try {
|
||||||
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||||
for (LocalResource resource : localResources.values()) {
|
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
|
||||||
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
|
.build();
|
||||||
destPath, resource, new Random());
|
exec = Executors.newCachedThreadPool(tf);
|
||||||
Future<Path> future = exec.submit(download);
|
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
||||||
resourcesToPaths.put(resource, future);
|
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
||||||
}
|
for (LocalResource resource : localResources.values()) {
|
||||||
for (LocalResource resource : localResources.values()) {
|
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
|
||||||
Path path;
|
destPath, resource, new Random());
|
||||||
try {
|
Future<Path> future = exec.submit(download);
|
||||||
path = resourcesToPaths.get(resource).get();
|
resourcesToPaths.put(resource, future);
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
}
|
||||||
String pathString = path.toUri().toString();
|
for (LocalResource resource : localResources.values()) {
|
||||||
if (resource.getType() == LocalResourceType.ARCHIVE) {
|
Path path;
|
||||||
localArchives.add(pathString);
|
try {
|
||||||
} else if (resource.getType() == LocalResourceType.FILE) {
|
path = resourcesToPaths.get(resource).get();
|
||||||
localFiles.add(pathString);
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
String pathString = path.toUri().toString();
|
||||||
|
if (resource.getType() == LocalResourceType.ARCHIVE) {
|
||||||
|
localArchives.add(pathString);
|
||||||
|
} else if (resource.getType() == LocalResourceType.FILE) {
|
||||||
|
localFiles.add(pathString);
|
||||||
|
}
|
||||||
|
Path resourcePath;
|
||||||
|
try {
|
||||||
|
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
LOG.info(String.format("Localized %s as %s", resourcePath, path));
|
||||||
|
String cp = resourcePath.toUri().getPath();
|
||||||
|
if (classpaths.keySet().contains(cp)) {
|
||||||
|
localClasspaths.add(path.toUri().getPath().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Path resourcePath;
|
} finally {
|
||||||
try {
|
if (exec != null) {
|
||||||
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
|
exec.shutdown();
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
LOG.info(String.format("Localized %s as %s", resourcePath, path));
|
|
||||||
String cp = resourcePath.toUri().getPath();
|
|
||||||
if (classpaths.keySet().contains(cp)) {
|
|
||||||
localClasspaths.add(path.toUri().getPath().toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the configuration object with localized data.
|
// Update the configuration object with localized data.
|
||||||
if (!localArchives.isEmpty()) {
|
if (!localArchives.isEmpty()) {
|
||||||
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
|
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -61,6 +62,8 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/** Implements MapReduce locally, in-process, for debugging. */
|
/** Implements MapReduce locally, in-process, for debugging. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@ -302,7 +305,10 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
LOG.debug("Map tasks to process: " + this.numMapTasks);
|
LOG.debug("Map tasks to process: " + this.numMapTasks);
|
||||||
|
|
||||||
// Create a new executor service to drain the work queue.
|
// Create a new executor service to drain the work queue.
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
|
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat("LocalJobRunner Map Task Executor #%d")
|
||||||
|
.build();
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
|
||||||
|
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue