YARN-112. Fixed a race condition during localization that fails containers. Contributed by Omkar Vinit Joshi.

MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. Contributed by Omkar Vinit Joshi.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466196 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-09 19:56:10 +00:00
parent d9593621b5
commit 3a54a5653b
10 changed files with 98 additions and 34 deletions

View File

@ -225,6 +225,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
submitting a job (Daryn Sharp via cos)
MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
Joshi via vinodkv)
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES

View File

@ -32,13 +32,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -91,6 +91,9 @@ class LocalDistributedCacheManager {
Map<String, LocalResource> localResources =
new LinkedHashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
// Generating unique numbers for FSDownload.
AtomicLong uniqueNumberGenerator =
new AtomicLong(System.currentTimeMillis());
// Find which resources are to be put on the local classpath
Map<String, Path> classpaths = new HashMap<String, Path>();
@ -128,8 +131,10 @@ class LocalDistributedCacheManager {
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
destPath, resource, new Random());
Callable<Path> download =
new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
Long.toString(uniqueNumberGenerator.incrementAndGet())),
resource);
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}

View File

@ -208,6 +208,9 @@ Release 2.0.5-beta - UNRELEASED
local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv)
YARN-112. Fixed a race condition during localization that fails containers.
(Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Download a single URL to the local disk.
@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
public class FSDownload implements Callable<Path> {
private static final Log LOG = LogFactory.getLog(FSDownload.class);
private Random rand;
private FileContext files;
private final UserGroupInformation userUgi;
private Configuration conf;
@ -71,13 +68,12 @@ public class FSDownload implements Callable<Path> {
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource, Random rand) {
Path destDirPath, LocalResource resource) {
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
this.rand = rand;
}
LocalResource getResource() {
@ -270,11 +266,6 @@ public class FSDownload implements Callable<Path> {
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
Path tmp;
do {
tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
} while (files.util().exists(tmp));
destDirPath = tmp;
createDir(destDirPath, cachePerms);
final Path dst_work = new Path(destDirPath + "_tmp");
createDir(dst_work, cachePerms);
@ -305,8 +296,6 @@ public class FSDownload implements Callable<Path> {
files.delete(dst_work, true);
} catch (FileNotFoundException ignore) {
}
// clear ref to internal var
rand = null;
conf = null;
resource = null;
}

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
@ -66,6 +67,8 @@ import org.junit.Test;
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
private static AtomicLong uniqueNumberGenerator =
new AtomicLong(System.currentTimeMillis());
@AfterClass
public static void deleteTestDir() throws IOException {
@ -267,9 +270,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), size, conf);
destPath = new Path (destPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
destPath, rsrc, new Random(sharedSeed));
destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@ -320,9 +325,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), sizes[i], conf);
destPath = new Path (destPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
destPath, rsrc, new Random(sharedSeed));
destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@ -380,9 +387,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrc = createTarFile(files, p, size, rand, vis);
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
destPath = new Path (destPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd = new FSDownload(files,
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
new Random(sharedSeed));
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
try {
@ -437,9 +445,10 @@ public class TestFSDownload {
LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
rsrcjar.setType(LocalResourceType.PATTERN);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
destPathjar = new Path (destPathjar,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdjar = new FSDownload(files,
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
new Random(sharedSeed));
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
pending.put(rsrcjar, exec.submit(fsdjar));
try {
@ -493,9 +502,10 @@ public class TestFSDownload {
Path p = new Path(basedir, "" + 1);
LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
destPathjar = new Path (destPathjar,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsdzip = new FSDownload(files,
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
new Random(sharedSeed));
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
pending.put(rsrczip, exec.submit(fsdzip));
try {
@ -586,9 +596,11 @@ public class TestFSDownload {
rsrcVis.put(rsrc, vis);
Path destPath = dirs.getLocalPathForWrite(
basedir.toString(), conf);
destPath = new Path (destPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
destPath, rsrc, new Random(sharedSeed));
destPath, rsrc);
pending.put(rsrc, exec.submit(fsd));
}
@ -614,4 +626,38 @@ public class TestFSDownload {
}
}
@Test(timeout = 1000)
public void testUniqueDestinationPath() throws Exception {
Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf);
destPath =
new Path(destPath, Long.toString(uniqueNumberGenerator
.incrementAndGet()));
try {
Path p = new Path(basedir, "dir" + 0 + ".jar");
LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
LocalResource rsrc = createJar(files, p, vis);
FSDownload fsd =
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
destPath, rsrc);
Future<Path> rPath = singleThreadedExec.submit(fsd);
// Now FSDownload will not create a random directory to localize the
// resource. Therefore the final localizedPath for the resource should be
// destination directory (passed as an argument) + file name.
Assert.assertEquals(destPath, rPath.get().getParent());
} finally {
singleThreadedExec.shutdown();
}
}
}

View File

@ -198,7 +198,7 @@ public class ContainerLocalizer {
Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
return new FSDownload(lfs, ugi, conf, path, rsrc);
}
static long getEstimatedSize(LocalResource rsrc) {

View File

@ -43,4 +43,5 @@ interface LocalResourcesTracker
// TODO: Remove this in favour of EventHandler.handle
void localizationCompleted(LocalResourceRequest req, boolean success);
long nextUniqueNumber();
}

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
*/
private ConcurrentHashMap<LocalResourceRequest, Path>
inProgressLocalResourcesMap;
/*
* starting with 10 to accommodate 0-9 directories created as a part of
* LocalCacheDirectoryManager. So there will be one unique number generator
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
boolean useLocalCacheDirectoryManager, Configuration conf) {
@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
}
}
}
@Override
public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet();
}
}

View File

@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@ -666,8 +665,11 @@ public class ResourceLocalizationService extends CompositeService
DiskChecker.checkDir(
new File(publicDirDestPath.toUri().getPath()));
}
publicDirDestPath =
new Path(publicDirDestPath, Long.toString(publicRsrc
.nextUniqueNumber()));
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
lfs, null, conf, publicDirDestPath, resource)),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) {
@ -955,9 +957,9 @@ public class ResourceLocalizationService extends CompositeService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath);
return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
}
@Override

View File

@ -520,7 +520,10 @@ public class TestResourceLocalizationService {
new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
URL localizedPath =
response.getResourceSpecs().get(0).getDestinationDirectory();
assertTrue(localizedPath.getFile().endsWith(localPath));
// Appending to local path unique number(10) generated as a part of
// LocalResourcesTracker
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "10"));
// get second resource
response = spyService.heartbeat(stat);
@ -534,7 +537,7 @@ public class TestResourceLocalizationService {
// LocalCacheDirectoryManager will be used and we have restricted number
// of files per directory to 1.
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "0"));
localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
// empty rsrc
response = spyService.heartbeat(stat);