YARN-112. Fixed a race condition during localization that fails containers. Contributed by Omkar Vinit Joshi.
MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. Contributed by Omkar Vinit Joshi. svn merge --ignore-ancestry -c 1466196 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466197 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f82642185
commit
76d72db5eb
|
@ -138,6 +138,9 @@ Release 2.0.4-alpha - UNRELEASED
|
||||||
MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
|
MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
|
||||||
actual cluster (Siddharth Seth via hitesh)
|
actual cluster (Siddharth Seth via hitesh)
|
||||||
|
|
||||||
|
MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
|
||||||
|
Joshi via vinodkv)
|
||||||
|
|
||||||
Release 2.0.3-alpha - 2013-02-06
|
Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -32,13 +32,13 @@ import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -91,6 +91,9 @@ class LocalDistributedCacheManager {
|
||||||
Map<String, LocalResource> localResources =
|
Map<String, LocalResource> localResources =
|
||||||
new LinkedHashMap<String, LocalResource>();
|
new LinkedHashMap<String, LocalResource>();
|
||||||
MRApps.setupDistributedCache(conf, localResources);
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
// Generating unique numbers for FSDownload.
|
||||||
|
AtomicLong uniqueNumberGenerator =
|
||||||
|
new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
// Find which resources are to be put on the local classpath
|
// Find which resources are to be put on the local classpath
|
||||||
Map<String, Path> classpaths = new HashMap<String, Path>();
|
Map<String, Path> classpaths = new HashMap<String, Path>();
|
||||||
|
@ -128,8 +131,10 @@ class LocalDistributedCacheManager {
|
||||||
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
||||||
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
||||||
for (LocalResource resource : localResources.values()) {
|
for (LocalResource resource : localResources.values()) {
|
||||||
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
|
Callable<Path> download =
|
||||||
destPath, resource, new Random());
|
new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet())),
|
||||||
|
resource);
|
||||||
Future<Path> future = exec.submit(download);
|
Future<Path> future = exec.submit(download);
|
||||||
resourcesToPaths.put(resource, future);
|
resourcesToPaths.put(resource, future);
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,6 +150,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
local directory hits unix file count limits and thus prevent job failures.
|
local directory hits unix file count limits and thus prevent job failures.
|
||||||
(Omkar Vinit Joshi via vinodkv)
|
(Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-112. Fixed a race condition during localization that fails containers.
|
||||||
|
(Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.RunJar;
|
import org.apache.hadoop.util.RunJar;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a single URL to the local disk.
|
* Download a single URL to the local disk.
|
||||||
|
@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
public class FSDownload implements Callable<Path> {
|
public class FSDownload implements Callable<Path> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FSDownload.class);
|
private static final Log LOG = LogFactory.getLog(FSDownload.class);
|
||||||
|
|
||||||
private Random rand;
|
|
||||||
private FileContext files;
|
private FileContext files;
|
||||||
private final UserGroupInformation userUgi;
|
private final UserGroupInformation userUgi;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -71,13 +68,12 @@ public class FSDownload implements Callable<Path> {
|
||||||
|
|
||||||
|
|
||||||
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
|
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
|
||||||
Path destDirPath, LocalResource resource, Random rand) {
|
Path destDirPath, LocalResource resource) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.destDirPath = destDirPath;
|
this.destDirPath = destDirPath;
|
||||||
this.files = files;
|
this.files = files;
|
||||||
this.userUgi = ugi;
|
this.userUgi = ugi;
|
||||||
this.resource = resource;
|
this.resource = resource;
|
||||||
this.rand = rand;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LocalResource getResource() {
|
LocalResource getResource() {
|
||||||
|
@ -270,11 +266,6 @@ public class FSDownload implements Callable<Path> {
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
throw new IOException("Invalid resource", e);
|
throw new IOException("Invalid resource", e);
|
||||||
}
|
}
|
||||||
Path tmp;
|
|
||||||
do {
|
|
||||||
tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
|
|
||||||
} while (files.util().exists(tmp));
|
|
||||||
destDirPath = tmp;
|
|
||||||
createDir(destDirPath, cachePerms);
|
createDir(destDirPath, cachePerms);
|
||||||
final Path dst_work = new Path(destDirPath + "_tmp");
|
final Path dst_work = new Path(destDirPath + "_tmp");
|
||||||
createDir(dst_work, cachePerms);
|
createDir(dst_work, cachePerms);
|
||||||
|
@ -305,8 +296,6 @@ public class FSDownload implements Callable<Path> {
|
||||||
files.delete(dst_work, true);
|
files.delete(dst_work, true);
|
||||||
} catch (FileNotFoundException ignore) {
|
} catch (FileNotFoundException ignore) {
|
||||||
}
|
}
|
||||||
// clear ref to internal var
|
|
||||||
rand = null;
|
|
||||||
conf = null;
|
conf = null;
|
||||||
resource = null;
|
resource = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.jar.JarOutputStream;
|
import java.util.jar.JarOutputStream;
|
||||||
import java.util.jar.Manifest;
|
import java.util.jar.Manifest;
|
||||||
|
|
||||||
|
@ -66,6 +67,8 @@ import org.junit.Test;
|
||||||
public class TestFSDownload {
|
public class TestFSDownload {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
|
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
|
||||||
|
private static AtomicLong uniqueNumberGenerator =
|
||||||
|
new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void deleteTestDir() throws IOException {
|
public static void deleteTestDir() throws IOException {
|
||||||
|
@ -267,9 +270,11 @@ public class TestFSDownload {
|
||||||
rsrcVis.put(rsrc, vis);
|
rsrcVis.put(rsrc, vis);
|
||||||
Path destPath = dirs.getLocalPathForWrite(
|
Path destPath = dirs.getLocalPathForWrite(
|
||||||
basedir.toString(), size, conf);
|
basedir.toString(), size, conf);
|
||||||
|
destPath = new Path (destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsd =
|
FSDownload fsd =
|
||||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||||
destPath, rsrc, new Random(sharedSeed));
|
destPath, rsrc);
|
||||||
pending.put(rsrc, exec.submit(fsd));
|
pending.put(rsrc, exec.submit(fsd));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -320,9 +325,11 @@ public class TestFSDownload {
|
||||||
rsrcVis.put(rsrc, vis);
|
rsrcVis.put(rsrc, vis);
|
||||||
Path destPath = dirs.getLocalPathForWrite(
|
Path destPath = dirs.getLocalPathForWrite(
|
||||||
basedir.toString(), sizes[i], conf);
|
basedir.toString(), sizes[i], conf);
|
||||||
|
destPath = new Path (destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsd =
|
FSDownload fsd =
|
||||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||||
destPath, rsrc, new Random(sharedSeed));
|
destPath, rsrc);
|
||||||
pending.put(rsrc, exec.submit(fsd));
|
pending.put(rsrc, exec.submit(fsd));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,9 +387,10 @@ public class TestFSDownload {
|
||||||
Path p = new Path(basedir, "" + 1);
|
Path p = new Path(basedir, "" + 1);
|
||||||
LocalResource rsrc = createTarFile(files, p, size, rand, vis);
|
LocalResource rsrc = createTarFile(files, p, size, rand, vis);
|
||||||
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
||||||
|
destPath = new Path (destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsd = new FSDownload(files,
|
FSDownload fsd = new FSDownload(files,
|
||||||
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
|
UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
|
||||||
new Random(sharedSeed));
|
|
||||||
pending.put(rsrc, exec.submit(fsd));
|
pending.put(rsrc, exec.submit(fsd));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -437,9 +445,10 @@ public class TestFSDownload {
|
||||||
LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
|
LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
|
||||||
rsrcjar.setType(LocalResourceType.PATTERN);
|
rsrcjar.setType(LocalResourceType.PATTERN);
|
||||||
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
||||||
|
destPathjar = new Path (destPathjar,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsdjar = new FSDownload(files,
|
FSDownload fsdjar = new FSDownload(files,
|
||||||
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
|
||||||
new Random(sharedSeed));
|
|
||||||
pending.put(rsrcjar, exec.submit(fsdjar));
|
pending.put(rsrcjar, exec.submit(fsdjar));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -493,9 +502,10 @@ public class TestFSDownload {
|
||||||
Path p = new Path(basedir, "" + 1);
|
Path p = new Path(basedir, "" + 1);
|
||||||
LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
|
LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
|
||||||
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
|
||||||
|
destPathjar = new Path (destPathjar,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsdzip = new FSDownload(files,
|
FSDownload fsdzip = new FSDownload(files,
|
||||||
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
|
UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
|
||||||
new Random(sharedSeed));
|
|
||||||
pending.put(rsrczip, exec.submit(fsdzip));
|
pending.put(rsrczip, exec.submit(fsdzip));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -586,9 +596,11 @@ public class TestFSDownload {
|
||||||
rsrcVis.put(rsrc, vis);
|
rsrcVis.put(rsrc, vis);
|
||||||
Path destPath = dirs.getLocalPathForWrite(
|
Path destPath = dirs.getLocalPathForWrite(
|
||||||
basedir.toString(), conf);
|
basedir.toString(), conf);
|
||||||
|
destPath = new Path (destPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
FSDownload fsd =
|
FSDownload fsd =
|
||||||
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||||
destPath, rsrc, new Random(sharedSeed));
|
destPath, rsrc);
|
||||||
pending.put(rsrc, exec.submit(fsd));
|
pending.put(rsrc, exec.submit(fsd));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,4 +626,38 @@ public class TestFSDownload {
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testUniqueDestinationPath() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
|
TestFSDownload.class.getSimpleName()));
|
||||||
|
files.mkdir(basedir, null, true);
|
||||||
|
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
|
||||||
|
|
||||||
|
ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
|
||||||
|
|
||||||
|
LocalDirAllocator dirs =
|
||||||
|
new LocalDirAllocator(TestFSDownload.class.getName());
|
||||||
|
Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf);
|
||||||
|
destPath =
|
||||||
|
new Path(destPath, Long.toString(uniqueNumberGenerator
|
||||||
|
.incrementAndGet()));
|
||||||
|
try {
|
||||||
|
Path p = new Path(basedir, "dir" + 0 + ".jar");
|
||||||
|
LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
|
||||||
|
LocalResource rsrc = createJar(files, p, vis);
|
||||||
|
FSDownload fsd =
|
||||||
|
new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
|
||||||
|
destPath, rsrc);
|
||||||
|
Future<Path> rPath = singleThreadedExec.submit(fsd);
|
||||||
|
// Now FSDownload will not create a random directory to localize the
|
||||||
|
// resource. Therefore the final localizedPath for the resource should be
|
||||||
|
// destination directory (passed as an argument) + file name.
|
||||||
|
Assert.assertEquals(destPath, rPath.get().getParent());
|
||||||
|
} finally {
|
||||||
|
singleThreadedExec.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -198,7 +198,7 @@ public class ContainerLocalizer {
|
||||||
Callable<Path> download(Path path, LocalResource rsrc,
|
Callable<Path> download(Path path, LocalResource rsrc,
|
||||||
UserGroupInformation ugi) throws IOException {
|
UserGroupInformation ugi) throws IOException {
|
||||||
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
|
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
|
||||||
return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
|
return new FSDownload(lfs, ugi, conf, path, rsrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
static long getEstimatedSize(LocalResource rsrc) {
|
static long getEstimatedSize(LocalResource rsrc) {
|
||||||
|
|
|
@ -43,4 +43,5 @@ interface LocalResourcesTracker
|
||||||
// TODO: Remove this in favour of EventHandler.handle
|
// TODO: Remove this in favour of EventHandler.handle
|
||||||
void localizationCompleted(LocalResourceRequest req, boolean success);
|
void localizationCompleted(LocalResourceRequest req, boolean success);
|
||||||
|
|
||||||
|
long nextUniqueNumber();
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.File;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
*/
|
*/
|
||||||
private ConcurrentHashMap<LocalResourceRequest, Path>
|
private ConcurrentHashMap<LocalResourceRequest, Path>
|
||||||
inProgressLocalResourcesMap;
|
inProgressLocalResourcesMap;
|
||||||
|
/*
|
||||||
|
* starting with 10 to accommodate 0-9 directories created as a part of
|
||||||
|
* LocalCacheDirectoryManager. So there will be one unique number generator
|
||||||
|
* per APPLICATION, USER and PUBLIC cache.
|
||||||
|
*/
|
||||||
|
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
|
||||||
|
|
||||||
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
|
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
|
||||||
boolean useLocalCacheDirectoryManager, Configuration conf) {
|
boolean useLocalCacheDirectoryManager, Configuration conf) {
|
||||||
|
@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long nextUniqueNumber() {
|
||||||
|
return uniqueNumberGenerator.incrementAndGet();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -34,7 +34,6 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -666,8 +665,11 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
DiskChecker.checkDir(
|
DiskChecker.checkDir(
|
||||||
new File(publicDirDestPath.toUri().getPath()));
|
new File(publicDirDestPath.toUri().getPath()));
|
||||||
}
|
}
|
||||||
|
publicDirDestPath =
|
||||||
|
new Path(publicDirDestPath, Long.toString(publicRsrc
|
||||||
|
.nextUniqueNumber()));
|
||||||
pending.put(queue.submit(new FSDownload(
|
pending.put(queue.submit(new FSDownload(
|
||||||
lfs, null, conf, publicDirDestPath, resource, new Random())),
|
lfs, null, conf, publicDirDestPath, resource)),
|
||||||
request);
|
request);
|
||||||
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
|
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -955,9 +957,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
Path dirPath =
|
Path dirPath =
|
||||||
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
||||||
ContainerLocalizer.getEstimatedSize(rsrc), false);
|
ContainerLocalizer.getEstimatedSize(rsrc), false);
|
||||||
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
|
dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
|
||||||
dirPath);
|
dirPath);
|
||||||
|
return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -520,7 +520,10 @@ public class TestResourceLocalizationService {
|
||||||
new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
|
new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
|
||||||
URL localizedPath =
|
URL localizedPath =
|
||||||
response.getResourceSpecs().get(0).getDestinationDirectory();
|
response.getResourceSpecs().get(0).getDestinationDirectory();
|
||||||
assertTrue(localizedPath.getFile().endsWith(localPath));
|
// Appending to local path unique number(10) generated as a part of
|
||||||
|
// LocalResourcesTracker
|
||||||
|
assertTrue(localizedPath.getFile().endsWith(
|
||||||
|
localPath + Path.SEPARATOR + "10"));
|
||||||
|
|
||||||
// get second resource
|
// get second resource
|
||||||
response = spyService.heartbeat(stat);
|
response = spyService.heartbeat(stat);
|
||||||
|
@ -534,7 +537,7 @@ public class TestResourceLocalizationService {
|
||||||
// LocalCacheDirectoryManager will be used and we have restricted number
|
// LocalCacheDirectoryManager will be used and we have restricted number
|
||||||
// of files per directory to 1.
|
// of files per directory to 1.
|
||||||
assertTrue(localizedPath.getFile().endsWith(
|
assertTrue(localizedPath.getFile().endsWith(
|
||||||
localPath + Path.SEPARATOR + "0"));
|
localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
|
||||||
|
|
||||||
// empty rsrc
|
// empty rsrc
|
||||||
response = spyService.heartbeat(stat);
|
response = spyService.heartbeat(stat);
|
||||||
|
|
Loading…
Reference in New Issue