YARN-547. Fixed race conditions in public and private resource localization which used to cause duplicate downloads. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1470076 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1470078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-19 22:36:52 +00:00
parent 6c37becf87
commit a02788f864
8 changed files with 597 additions and 120 deletions

View File

@ -201,6 +201,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication
caused by YARN-514. (Zhijie Shen via vinodkv)
YARN-547. Fixed race conditions in public and private resource localization
which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import com.google.common.annotations.VisibleForTesting;
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
@ -41,4 +44,8 @@ interface LocalResourcesTracker
String getUser();
long nextUniqueNumber();
@VisibleForTesting
@Private
LocalizedResource getLocalizedResource(LocalResourceRequest request);
}

View File

@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import com.google.common.annotations.VisibleForTesting;
/**
* A collection of {@link LocalizedResource}s all of same
@ -307,4 +310,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet();
}
@VisibleForTesting
@Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
}

View File

@ -78,27 +78,20 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
.addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchDirectTransition())
.addTransition(ResourceState.INIT, ResourceState.INIT,
ResourceEventType.RELEASE, new ReleaseTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
.addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchSuccessTransition())
.addTransition(ResourceState.DOWNLOADING,
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
ResourceEventType.RELEASE, new ReleasePendingTransition())
.addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING,
ResourceEventType.RELEASE, new ReleaseTransition())
.addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
// From LOCALIZED (ref >= 0, on disk)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.REQUEST, new LocalizedResourceTransition())
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.RELEASE, new ReleaseTransition())
.installTopology();
@ -230,14 +223,6 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
}
}
private static class FetchDirectTransition extends FetchSuccessTransition {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
LOG.warn("Resource " + rsrc + " localized without listening container");
super.transition(rsrc, event);
}
}
/**
* Resource localized, notify waiting containers.
*/
@ -304,17 +289,4 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
rsrc.release(relEvent.getContainer());
}
}
private static class ReleasePendingTransition implements
MultipleArcTransition<LocalizedResource,ResourceEvent,ResourceState> {
@Override
public ResourceState transition(LocalizedResource rsrc,
ResourceEvent event) {
ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
rsrc.release(relEvent.getContainer());
return rsrc.ref.isEmpty()
? ResourceState.INIT
: ResourceState.DOWNLOADING;
}
}
}

View File

@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
@ -492,7 +494,25 @@ public class ResourceLocalizationService extends CompositeService
+ Path.SEPARATOR + appId;
return path;
}
@VisibleForTesting
@Private
public PublicLocalizer getPublicLocalizer() {
return localizerTracker.publicLocalizer;
}
@VisibleForTesting
@Private
public LocalizerRunner getLocalizerRunner(String locId) {
return localizerTracker.privLocalizers.get(locId);
}
@VisibleForTesting
@Private
public Map<String, LocalizerRunner> getPrivateLocalizers() {
return localizerTracker.privLocalizers;
}
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@ -606,41 +626,20 @@ public class ResourceLocalizationService extends CompositeService
final ExecutorService threadPool;
final CompletionService<Path> queue;
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
// TODO hack to work around broken signaling
final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending,
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
super("Public Localizer");
this.lfs = lfs;
this.conf = conf;
this.pending = pending;
this.attempts = attempts;
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// for (int i = 0, n = localDirs.size(); i < n; ++i) {
// publicFilecache[i] =
// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
// }
// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// int i = 0;
// for (String localDir : localDirs) {
// publicFilecache[i++] =
// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
// }
this.threadPool = threadPool;
this.queue = new ExecutorCompletionService<Path>(threadPool);
@ -648,36 +647,45 @@ public class ResourceLocalizationService extends CompositeService
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
LocalResourceRequest key = request.getResource().getRequest();
LocalizedResource rsrc = request.getResource();
LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key);
synchronized (attempts) {
List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
if (null == sigh) {
/*
* Here multiple containers may request the same resource. So we need
* to start downloading only when
* 1) ResourceState == DOWNLOADING
* 2) We are able to acquire non blocking semaphore lock.
* If not we will skip this resource as either it is getting downloaded
* or it FAILED / LOCALIZED.
*/
if (rsrc.tryAcquire()) {
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path hierarchicalPath =
publicRsrc.getPathForLocalization(key, publicDirDestPath);
publicRsrc.getPathForLocalization(key, publicDirDestPath);
if (!hierarchicalPath.equals(publicDirDestPath)) {
publicDirDestPath = hierarchicalPath;
DiskChecker.checkDir(
new File(publicDirDestPath.toUri().getPath()));
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
publicDirDestPath =
new Path(publicDirDestPath, Long.toString(publicRsrc
.nextUniqueNumber()));
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource)),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource)), request);
} catch (IOException e) {
rsrc.unlock();
// TODO Need to Fix IO Exceptions - Notifying resource
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
}
} else {
sigh.add(request);
rsrc.unlock();
}
}
}
@ -700,24 +708,14 @@ public class ResourceLocalizationService extends CompositeService
LocalResourceRequest key = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
.getDU(new File(local.toUri()))));
synchronized (attempts) {
attempts.remove(key);
}
assoc.getResource().unlock();
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
.getCause()));
synchronized (attempts) {
List<LocalizerResourceRequestEvent> reqs;
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
return;
}
attempts.remove(req);
}
assoc.getResource().unlock();
} catch (CancellationException e) {
// ignore; shutting down
}
@ -776,22 +774,35 @@ public class ResourceLocalizationService extends CompositeService
i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next();
LocalizedResource nRsrc = evt.getResource();
if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
// Resource download should take place ONLY if resource is in
// Downloading state
if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
i.remove();
continue;
}
/*
* Multiple containers will try to download the same resource. So the
* resource download should start only if
* 1) We can acquire a non blocking semaphore lock on resource
* 2) Resource is still in DOWNLOADING state
*/
if (nRsrc.tryAcquire()) {
LocalResourceRequest nextRsrc = nRsrc.getRequest();
LocalResource next =
recordFactory.newRecordInstance(LocalResource.class);
next.setResource(
ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
next.setTimestamp(nextRsrc.getTimestamp());
next.setType(nextRsrc.getType());
next.setVisibility(evt.getVisibility());
next.setPattern(evt.getPattern());
scheduled.put(nextRsrc, evt);
return next;
if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResourceRequest nextRsrc = nRsrc.getRequest();
LocalResource next =
recordFactory.newRecordInstance(LocalResource.class);
next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
.getPath()));
next.setTimestamp(nextRsrc.getTimestamp());
next.setType(nextRsrc.getType());
next.setVisibility(evt.getVisibility());
next.setPattern(evt.getPattern());
scheduled.put(nextRsrc, evt);
return next;
} else {
// Need to release acquired lock
nRsrc.unlock();
}
}
}
return null;
@ -863,6 +874,12 @@ public class ResourceLocalizationService extends CompositeService
new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { }
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@ -889,11 +906,16 @@ public class ResourceLocalizationService extends CompositeService
break;
case FETCH_FAILURE:
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(
new ResourceFailedLocalizationEvent(req, stat.getException()));
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
break;
default:
LOG.info("Unknown status: " + stat.getStatus());

View File

@ -129,14 +129,10 @@ public class TestLocalResourcesTrackerImpl {
dispatcher.await();
verifyTrackedResourceCount(tracker, 2);
// Verify resources in state INIT with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr2, mockDelService));
verifyTrackedResourceCount(tracker, 1);
// Verify resource with non zero ref count is not removed.
Assert.assertEquals(2, lr1.getRefCount());
Assert.assertFalse(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 1);
verifyTrackedResourceCount(tracker, 2);
// Localize resource1
ResourceLocalizedEvent rle =
@ -151,7 +147,7 @@ public class TestLocalResourcesTrackerImpl {
// Verify resources in state LOCALIZED with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 0);
verifyTrackedResourceCount(tracker, 1);
} finally {
if (dispatcher != null) {
dispatcher.stop();

View File

@ -117,7 +117,7 @@ public class TestLocalizedResource {
local.handle(new ResourceReleaseEvent(rsrcA, container1));
dispatcher.await();
verify(containerBus, never()).handle(isA(ContainerEvent.class));
assertEquals(ResourceState.INIT, local.getState());
assertEquals(ResourceState.DOWNLOADING, local.getState());
// Register C2, C3
final ContainerId container2 = getMockContainer(2);
@ -176,24 +176,6 @@ public class TestLocalizedResource {
}
}
@Test
public void testDirectLocalization() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(new Configuration());
try {
dispatcher.start();
LocalResource apiRsrc = createMockResource();
LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
Path p = new Path("file:///cache/rsrcA");
local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
dispatcher.await();
assertEquals(ResourceState.LOCALIZED, local.getState());
} finally {
dispatcher.stop();
}
}
static LocalResource createMockResource() {
// mock rsrc location
org.apache.hadoop.yarn.api.records.URL uriA =

View File

@ -34,9 +34,9 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -53,6 +53,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import junit.framework.Assert;
@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAc
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
@ -677,6 +687,481 @@ public class TestResourceLocalizationService {
}
}
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
DrainDispatcher dispatcher1 = null;
try {
dispatcher1 = new DrainDispatcher();
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
// mocked Resource Localization Service
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
rls.handle(createApplicationLocalizationEvent(user, appId));
LocalResourceRequest req =
new LocalResourceRequest(new Path("file:///tmp"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
// We need to pre-populate the LocalizerRunner as the
// Resource Localization Service code internally starts them which
// definitely we don't want.
// creating new containers and populating corresponding localizer runners
// Container - 1
ContainerImpl container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString();
rls.getPrivateLocalizers().put(
localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1));
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container1,
LocalResourceVisibility.PRIVATE, req));
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200));
// Container - 2 now makes the request.
ContainerImpl container2 = createMockContainer(user, 2);
String localizerId2 = container2.getContainerID().toString();
rls.getPrivateLocalizers().put(
localizerId2,
rls.new LocalizerRunner(new LocalizerContext(user, container2
.getContainerID(), null), localizerId2));
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2,
LocalResourceVisibility.PRIVATE, req));
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200));
// Retrieving localized resource.
LocalResourcesTracker tracker =
rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user,
appId);
LocalizedResource lr = tracker.getLocalizedResource(req);
// Resource would now have moved into DOWNLOADING state
Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
// Resource should have one permit
Assert.assertEquals(1, lr.sem.availablePermits());
// Resource Localization Service receives first heart beat from
// ContainerLocalizer for container1
LocalizerHeartbeatResponse response1 =
rls.heartbeat(createLocalizerStatus(localizerId1));
// Resource must have been added to scheduled map
Assert.assertEquals(1, localizerRunner1.scheduled.size());
// Checking resource in the response and also available permits for it.
Assert.assertEquals(req.getResource(), response1.getResourceSpecs()
.get(0).getResource().getResource());
Assert.assertEquals(0, lr.sem.availablePermits());
// Resource Localization Service now receives first heart beat from
// ContainerLocalizer for container2
LocalizerHeartbeatResponse response2 =
rls.heartbeat(createLocalizerStatus(localizerId2));
// Resource must not have been added to scheduled map
Assert.assertEquals(0, localizerRunner2.scheduled.size());
// No resource is returned in response
Assert.assertEquals(0, response2.getResourceSpecs().size());
// ContainerLocalizer - 1 now sends failed resource heartbeat.
rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req));
// Resource Localization should fail and state is modified accordingly.
// Also Local should be release on the LocalizedResource.
Assert
.assertTrue(waitForResourceState(lr, rls, req,
LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED,
200));
Assert.assertTrue(lr.getState().equals(ResourceState.FAILED));
Assert.assertEquals(0, localizerRunner1.scheduled.size());
// Now Container-2 once again sends heart beat to resource localization
// service
// Now container-2 again try to download the resource it should still
// not get the resource as the resource is now not in DOWNLOADING state.
response2 = rls.heartbeat(createLocalizerStatus(localizerId2));
// Resource must not have been added to scheduled map.
// Also as the resource has failed download it will be removed from
// pending list.
Assert.assertEquals(0, localizerRunner2.scheduled.size());
Assert.assertEquals(0, localizerRunner2.pending.size());
Assert.assertEquals(0, response2.getResourceSpecs().size());
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private LocalizerStatus createLocalizerStatusForFailedResource(
String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId);
LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
resourceStatus.setException(new YarnRemoteExceptionPBImpl("test"));
resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
resourceStatus.setResource(req);
status.addResourceStatus(resourceStatus);
return status;
}
private LocalizerStatus createLocalizerStatus(String localizerId1) {
LocalizerStatus status = new LocalizerStatusPBImpl();
status.setLocalizerId(localizerId1);
return status;
}
private LocalizationEvent createApplicationLocalizationEvent(String user,
ApplicationId appId) {
Application app = mock(Application.class);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
return new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app);
}
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testParallelDownloadAttemptsForPublicResource() throws Exception {
DrainDispatcher dispatcher1 = null;
String user = "testuser";
try {
// Setting up ResourceLocalization service.
Configuration conf = new Configuration();
dispatcher1 = new DrainDispatcher();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
// Creating and initializing ResourceLocalizationService but not starting
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
// Initially pending map should be empty for public localizer
Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size());
LocalResourceRequest req =
new LocalResourceRequest(new Path("/tmp"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");
// Initializing application
ApplicationImpl app = mock(ApplicationImpl.class);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
when(app.getAppId()).thenReturn(appId);
when(app.getUser()).thenReturn(user);
dispatcher1.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
// Container - 1
// container requesting the resource
ContainerImpl container1 = createMockContainer(user, 1);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container1,
LocalResourceVisibility.PUBLIC, req));
// Waiting for resource to change into DOWNLOADING state.
Assert.assertTrue(waitForResourceState(null, spyService, req,
LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING,
200));
// Waiting for download to start.
Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200));
LocalizedResource lr =
getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC,
user, null);
// Resource would now have moved into DOWNLOADING state
Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
// pending should have this resource now.
Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size());
// Now resource should have 0 permit.
Assert.assertEquals(0, lr.sem.availablePermits());
// Container - 2
// Container requesting the same resource.
ContainerImpl container2 = createMockContainer(user, 2);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2,
LocalResourceVisibility.PUBLIC, req));
// Waiting for download to start. This should return false as new download
// will not start
Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100));
// Now Failing the resource download. As a part of it
// resource state is changed and then lock is released.
ResourceFailedLocalizationEvent locFailedEvent =
new ResourceFailedLocalizationEvent(req, new Exception("test"));
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user,
null).handle(locFailedEvent);
// Waiting for resource to change into FAILED state.
Assert.assertTrue(waitForResourceState(lr, spyService, req,
LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200));
// releasing lock as a part of download failed process.
lr.unlock();
// removing pending download request.
spyService.getPublicLocalizer().pending.clear();
// Now I need to simulate a race condition wherein Event is added to
// dispatcher before resource state changes to either FAILED or LOCALIZED
// Hence sending event directly to dispatcher.
LocalizerResourceRequestEvent localizerEvent =
new LocalizerResourceRequestEvent(lr, null,
mock(LocalizerContext.class), null);
dispatcher1.getEventHandler().handle(localizerEvent);
// Waiting for download to start. This should return false as new download
// will not start
Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100));
// Checking available permits now.
Assert.assertEquals(1, lr.sem.availablePermits());
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private boolean waitForPrivateDownloadToStart(
ResourceLocalizationService service, String localizerId, int size,
int maxWaitTime) {
List<LocalizerResourceRequestEvent> pending = null;
// Waiting for localizer to be created.
do {
if (service.getPrivateLocalizers().get(localizerId) != null) {
pending = service.getPrivateLocalizers().get(localizerId).pending;
}
if (pending == null) {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
if (pending == null) {
return false;
}
do {
if (pending.size() == size) {
return true;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
}
} while (maxWaitTime > 0);
return pending.size() == size;
}
private boolean waitForPublicDownloadToStart(
ResourceLocalizationService service, int size, int maxWaitTime) {
Map<Future<Path>, LocalizerResourceRequestEvent> pending = null;
// Waiting for localizer to be created.
do {
if (service.getPublicLocalizer() != null) {
pending = service.getPublicLocalizer().pending;
}
if (pending == null) {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
if (pending == null) {
return false;
}
do {
if (pending.size() == size) {
return true;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
} while (maxWaitTime > 0);
return pending.size() == size;
}
private LocalizedResource getLocalizedResource(
ResourceLocalizationService service, LocalResourceRequest req,
LocalResourceVisibility vis, String user, ApplicationId appId) {
return service.getLocalResourcesTracker(vis, user, appId)
.getLocalizedResource(req);
}
private boolean waitForResourceState(LocalizedResource lr,
ResourceLocalizationService service, LocalResourceRequest req,
LocalResourceVisibility vis, String user, ApplicationId appId,
ResourceState resourceState, long maxWaitTime) {
LocalResourcesTracker tracker = null;
// checking tracker is created
do {
if (tracker == null) {
tracker = service.getLocalResourcesTracker(vis, user, appId);
}
if (tracker != null && lr == null) {
lr = tracker.getLocalizedResource(req);
}
if (lr != null) {
break;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
} while (maxWaitTime > 0);
// this will wait till resource state is changed to (resourceState).
if (lr == null) {
return false;
}
do {
if (!lr.getState().equals(resourceState)) {
try {
maxWaitTime -= 50;
Thread.sleep(50);
} catch (InterruptedException e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
return lr.getState().equals(resourceState);
}
private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
ContainerImpl container, LocalResourceVisibility vis,
LocalResourceRequest req) {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
List<LocalResourceRequest> resourceList =
new ArrayList<LocalResourceRequest>();
resourceList.add(req);
reqs.put(vis, resourceList);
return new ContainerLocalizationRequestEvent(container, reqs);
}
private ContainerImpl createMockContainer(String user, int containerId) {
ContainerImpl container = mock(ContainerImpl.class);
when(container.getContainerID()).thenReturn(
BuilderUtils.newContainerId(1, 1, 1, containerId));
when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class);
when(container.getCredentials()).thenReturn(mockCredentials);
return container;
}
private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path);
return url;