YARN-547. Fixed race conditions in public and private resource localization which used to cause duplicate downloads. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-19 22:35:43 +00:00
parent d5556cb5c4
commit c570309b07
8 changed files with 597 additions and 120 deletions

View File

@ -269,6 +269,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication
caused by YARN-514. (Zhijie Shen via vinodkv) caused by YARN-514. (Zhijie Shen via vinodkv)
YARN-547. Fixed race conditions in public and private resource localization
which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Component tracking resources all of the same {@link LocalResourceVisibility} * Component tracking resources all of the same {@link LocalResourceVisibility}
* *
@ -41,4 +44,8 @@ interface LocalResourcesTracker
String getUser(); String getUser();
long nextUniqueNumber(); long nextUniqueNumber();
@VisibleForTesting
@Private
LocalizedResource getLocalizedResource(LocalResourceRequest request);
} }

View File

@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A collection of {@link LocalizedResource}s all of same * A collection of {@link LocalizedResource}s all of same
@ -307,4 +310,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
public long nextUniqueNumber() { public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet(); return uniqueNumberGenerator.incrementAndGet();
} }
@VisibleForTesting
@Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
} }

View File

@ -78,27 +78,20 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
// From INIT (ref == 0, awaiting req) // From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition()) ResourceEventType.REQUEST, new FetchResourceTransition())
.addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchDirectTransition())
.addTransition(ResourceState.INIT, ResourceState.INIT,
ResourceEventType.RELEASE, new ReleaseTransition())
// From DOWNLOADING (ref > 0, may be localizing) // From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
.addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchSuccessTransition()) ResourceEventType.LOCALIZED, new FetchSuccessTransition())
.addTransition(ResourceState.DOWNLOADING, .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING,
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), ResourceEventType.RELEASE, new ReleaseTransition())
ResourceEventType.RELEASE, new ReleasePendingTransition())
.addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
// From LOCALIZED (ref >= 0, on disk) // From LOCALIZED (ref >= 0, on disk)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.REQUEST, new LocalizedResourceTransition()) ResourceEventType.REQUEST, new LocalizedResourceTransition())
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.RELEASE, new ReleaseTransition()) ResourceEventType.RELEASE, new ReleaseTransition())
.installTopology(); .installTopology();
@ -230,14 +223,6 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
} }
} }
private static class FetchDirectTransition extends FetchSuccessTransition {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
LOG.warn("Resource " + rsrc + " localized without listening container");
super.transition(rsrc, event);
}
}
/** /**
* Resource localized, notify waiting containers. * Resource localized, notify waiting containers.
*/ */
@ -304,17 +289,4 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
rsrc.release(relEvent.getContainer()); rsrc.release(relEvent.getContainer());
} }
} }
private static class ReleasePendingTransition implements
MultipleArcTransition<LocalizedResource,ResourceEvent,ResourceState> {
@Override
public ResourceState transition(LocalizedResource rsrc,
ResourceEvent event) {
ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
rsrc.release(relEvent.getContainer());
return rsrc.ref.isEmpty()
? ResourceState.INIT
: ResourceState.DOWNLOADING;
}
}
} }

View File

@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload; import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService public class ResourceLocalizationService extends CompositeService
@ -492,7 +494,25 @@ public class ResourceLocalizationService extends CompositeService
+ Path.SEPARATOR + appId; + Path.SEPARATOR + appId;
return path; return path;
} }
@VisibleForTesting
@Private
public PublicLocalizer getPublicLocalizer() {
return localizerTracker.publicLocalizer;
}
@VisibleForTesting
@Private
public LocalizerRunner getLocalizerRunner(String locId) {
return localizerTracker.privLocalizers.get(locId);
}
@VisibleForTesting
@Private
public Map<String, LocalizerRunner> getPrivateLocalizers() {
return localizerTracker.privLocalizers;
}
/** /**
* Sub-component handling the spawning of {@link ContainerLocalizer}s * Sub-component handling the spawning of {@link ContainerLocalizer}s
*/ */
@ -606,41 +626,20 @@ public class ResourceLocalizationService extends CompositeService
final ExecutorService threadPool; final ExecutorService threadPool;
final CompletionService<Path> queue; final CompletionService<Path> queue;
final Map<Future<Path>,LocalizerResourceRequestEvent> pending; final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
// TODO hack to work around broken signaling
final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
PublicLocalizer(Configuration conf) { PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf), this(conf, getLocalFileContext(conf),
createLocalizerExecutor(conf), createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(), new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
} }
PublicLocalizer(Configuration conf, FileContext lfs, PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool, ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending, Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
super("Public Localizer"); super("Public Localizer");
this.lfs = lfs; this.lfs = lfs;
this.conf = conf; this.conf = conf;
this.pending = pending; this.pending = pending;
this.attempts = attempts;
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// for (int i = 0, n = localDirs.size(); i < n; ++i) {
// publicFilecache[i] =
// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
// }
// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// int i = 0;
// for (String localDir : localDirs) {
// publicFilecache[i++] =
// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
// }
this.threadPool = threadPool; this.threadPool = threadPool;
this.queue = new ExecutorCompletionService<Path>(threadPool); this.queue = new ExecutorCompletionService<Path>(threadPool);
@ -648,36 +647,45 @@ public class ResourceLocalizationService extends CompositeService
public void addResource(LocalizerResourceRequestEvent request) { public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers // TODO handle failures, cancellation, requests by other containers
LocalResourceRequest key = request.getResource().getRequest(); LocalizedResource rsrc = request.getResource();
LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key); LOG.info("Downloading public rsrc:" + key);
synchronized (attempts) { /*
List<LocalizerResourceRequestEvent> sigh = attempts.get(key); * Here multiple containers may request the same resource. So we need
if (null == sigh) { * to start downloading only when
* 1) ResourceState == DOWNLOADING
* 2) We are able to acquire non blocking semaphore lock.
* If not we will skip this resource as either it is getting downloaded
* or it FAILED / LOCALIZED.
*/
if (rsrc.tryAcquire()) {
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest(); LocalResource resource = request.getResource().getRequest();
try { try {
Path publicDirDestPath = dirsHandler.getLocalPathForWrite( Path publicDirDestPath =
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
ContainerLocalizer.getEstimatedSize(resource), true); + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path hierarchicalPath = Path hierarchicalPath =
publicRsrc.getPathForLocalization(key, publicDirDestPath); publicRsrc.getPathForLocalization(key, publicDirDestPath);
if (!hierarchicalPath.equals(publicDirDestPath)) { if (!hierarchicalPath.equals(publicDirDestPath)) {
publicDirDestPath = hierarchicalPath; publicDirDestPath = hierarchicalPath;
DiskChecker.checkDir( DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
new File(publicDirDestPath.toUri().getPath()));
} }
publicDirDestPath = publicDirDestPath =
new Path(publicDirDestPath, Long.toString(publicRsrc new Path(publicDirDestPath, Long.toString(publicRsrc
.nextUniqueNumber())); .nextUniqueNumber()));
pending.put(queue.submit(new FSDownload( pending.put(queue.submit(new FSDownload(lfs, null, conf,
lfs, null, conf, publicDirDestPath, resource)), publicDirDestPath, resource)), request);
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) { } catch (IOException e) {
rsrc.unlock();
// TODO Need to Fix IO Exceptions - Notifying resource
LOG.error("Local path for public localization is not found. " LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e); + " May be disks failed.", e);
} }
} else { } else {
sigh.add(request); rsrc.unlock();
} }
} }
} }
@ -700,24 +708,14 @@ public class ResourceLocalizationService extends CompositeService
LocalResourceRequest key = assoc.getResource().getRequest(); LocalResourceRequest key = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
.getDU(new File(local.toUri())))); .getDU(new File(local.toUri()))));
synchronized (attempts) { assoc.getResource().unlock();
attempts.remove(key);
}
} catch (ExecutionException e) { } catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(), LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause()); e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest(); LocalResourceRequest req = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
.getCause())); .getCause()));
synchronized (attempts) { assoc.getResource().unlock();
List<LocalizerResourceRequestEvent> reqs;
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
return;
}
attempts.remove(req);
}
} catch (CancellationException e) { } catch (CancellationException e) {
// ignore; shutting down // ignore; shutting down
} }
@ -776,22 +774,35 @@ public class ResourceLocalizationService extends CompositeService
i.hasNext();) { i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next(); LocalizerResourceRequestEvent evt = i.next();
LocalizedResource nRsrc = evt.getResource(); LocalizedResource nRsrc = evt.getResource();
if (ResourceState.LOCALIZED.equals(nRsrc.getState())) { // Resource download should take place ONLY if resource is in
// Downloading state
if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
i.remove(); i.remove();
continue; continue;
} }
/*
* Multiple containers will try to download the same resource. So the
* resource download should start only if
* 1) We can acquire a non blocking semaphore lock on resource
* 2) Resource is still in DOWNLOADING state
*/
if (nRsrc.tryAcquire()) { if (nRsrc.tryAcquire()) {
LocalResourceRequest nextRsrc = nRsrc.getRequest(); if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource next = LocalResourceRequest nextRsrc = nRsrc.getRequest();
recordFactory.newRecordInstance(LocalResource.class); LocalResource next =
next.setResource( recordFactory.newRecordInstance(LocalResource.class);
ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
next.setTimestamp(nextRsrc.getTimestamp()); .getPath()));
next.setType(nextRsrc.getType()); next.setTimestamp(nextRsrc.getTimestamp());
next.setVisibility(evt.getVisibility()); next.setType(nextRsrc.getType());
next.setPattern(evt.getPattern()); next.setVisibility(evt.getVisibility());
scheduled.put(nextRsrc, evt); next.setPattern(evt.getPattern());
return next; scheduled.put(nextRsrc, evt);
return next;
} else {
// Need to release acquired lock
nRsrc.unlock();
}
} }
} }
return null; return null;
@ -863,6 +874,12 @@ public class ResourceLocalizationService extends CompositeService
new ResourceLocalizedEvent(req, ConverterUtils new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { } } catch (URISyntaxException e) { }
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
if (pending.isEmpty()) { if (pending.isEmpty()) {
// TODO: Synchronization // TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE); response.setLocalizerAction(LocalizerAction.DIE);
@ -889,11 +906,16 @@ public class ResourceLocalizationService extends CompositeService
break; break;
case FETCH_FAILURE: case FETCH_FAILURE:
LOG.info("DEBUG: FAILED " + req, stat.getException()); LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE); response.setLocalizerAction(LocalizerAction.DIE);
getLocalResourcesTracker(req.getVisibility(), user, applicationId) getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle( .handle(
new ResourceFailedLocalizationEvent(req, stat.getException())); new ResourceFailedLocalizationEvent(req, stat.getException()));
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
break; break;
default: default:
LOG.info("Unknown status: " + stat.getStatus()); LOG.info("Unknown status: " + stat.getStatus());

View File

@ -129,14 +129,10 @@ public class TestLocalResourcesTrackerImpl {
dispatcher.await(); dispatcher.await();
verifyTrackedResourceCount(tracker, 2); verifyTrackedResourceCount(tracker, 2);
// Verify resources in state INIT with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr2, mockDelService));
verifyTrackedResourceCount(tracker, 1);
// Verify resource with non zero ref count is not removed. // Verify resource with non zero ref count is not removed.
Assert.assertEquals(2, lr1.getRefCount()); Assert.assertEquals(2, lr1.getRefCount());
Assert.assertFalse(tracker.remove(lr1, mockDelService)); Assert.assertFalse(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 1); verifyTrackedResourceCount(tracker, 2);
// Localize resource1 // Localize resource1
ResourceLocalizedEvent rle = ResourceLocalizedEvent rle =
@ -151,7 +147,7 @@ public class TestLocalResourcesTrackerImpl {
// Verify resources in state LOCALIZED with ref-count=0 is removed. // Verify resources in state LOCALIZED with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr1, mockDelService)); Assert.assertTrue(tracker.remove(lr1, mockDelService));
verifyTrackedResourceCount(tracker, 0); verifyTrackedResourceCount(tracker, 1);
} finally { } finally {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.stop(); dispatcher.stop();

View File

@ -117,7 +117,7 @@ public class TestLocalizedResource {
local.handle(new ResourceReleaseEvent(rsrcA, container1)); local.handle(new ResourceReleaseEvent(rsrcA, container1));
dispatcher.await(); dispatcher.await();
verify(containerBus, never()).handle(isA(ContainerEvent.class)); verify(containerBus, never()).handle(isA(ContainerEvent.class));
assertEquals(ResourceState.INIT, local.getState()); assertEquals(ResourceState.DOWNLOADING, local.getState());
// Register C2, C3 // Register C2, C3
final ContainerId container2 = getMockContainer(2); final ContainerId container2 = getMockContainer(2);
@ -176,24 +176,6 @@ public class TestLocalizedResource {
} }
} }
@Test
public void testDirectLocalization() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(new Configuration());
try {
dispatcher.start();
LocalResource apiRsrc = createMockResource();
LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
Path p = new Path("file:///cache/rsrcA");
local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
dispatcher.await();
assertEquals(ResourceState.LOCALIZED, local.getState());
} finally {
dispatcher.stop();
}
}
static LocalResource createMockResource() { static LocalResource createMockResource() {
// mock rsrc location // mock rsrc location
org.apache.hadoop.yarn.api.records.URL uriA = org.apache.hadoop.yarn.api.records.URL uriA =

View File

@ -34,9 +34,9 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -53,6 +53,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import junit.framework.Assert; import junit.framework.Assert;
@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAc
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -677,6 +687,481 @@ public class TestResourceLocalizationService {
} }
} }
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
DrainDispatcher dispatcher1 = null;
try {
dispatcher1 = new DrainDispatcher();
String user = "testuser";
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
// mocked Resource Localization Service
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
localDirHandler.init(conf);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
localDirHandler);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
rls.handle(createApplicationLocalizationEvent(user, appId));
LocalResourceRequest req =
new LocalResourceRequest(new Path("file:///tmp"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
// We need to pre-populate the LocalizerRunner as the
// Resource Localization Service code internally starts them which
// definitely we don't want.
// creating new containers and populating corresponding localizer runners
// Container - 1
ContainerImpl container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString();
rls.getPrivateLocalizers().put(
localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1));
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container1,
LocalResourceVisibility.PRIVATE, req));
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200));
// Container - 2 now makes the request.
ContainerImpl container2 = createMockContainer(user, 2);
String localizerId2 = container2.getContainerID().toString();
rls.getPrivateLocalizers().put(
localizerId2,
rls.new LocalizerRunner(new LocalizerContext(user, container2
.getContainerID(), null), localizerId2));
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2,
LocalResourceVisibility.PRIVATE, req));
Assert
.assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200));
// Retrieving localized resource.
LocalResourcesTracker tracker =
rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user,
appId);
LocalizedResource lr = tracker.getLocalizedResource(req);
// Resource would now have moved into DOWNLOADING state
Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
// Resource should have one permit
Assert.assertEquals(1, lr.sem.availablePermits());
// Resource Localization Service receives first heart beat from
// ContainerLocalizer for container1
LocalizerHeartbeatResponse response1 =
rls.heartbeat(createLocalizerStatus(localizerId1));
// Resource must have been added to scheduled map
Assert.assertEquals(1, localizerRunner1.scheduled.size());
// Checking resource in the response and also available permits for it.
Assert.assertEquals(req.getResource(), response1.getResourceSpecs()
.get(0).getResource().getResource());
Assert.assertEquals(0, lr.sem.availablePermits());
// Resource Localization Service now receives first heart beat from
// ContainerLocalizer for container2
LocalizerHeartbeatResponse response2 =
rls.heartbeat(createLocalizerStatus(localizerId2));
// Resource must not have been added to scheduled map
Assert.assertEquals(0, localizerRunner2.scheduled.size());
// No resource is returned in response
Assert.assertEquals(0, response2.getResourceSpecs().size());
// ContainerLocalizer - 1 now sends failed resource heartbeat.
rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req));
// Resource Localization should fail and state is modified accordingly.
// Also Local should be release on the LocalizedResource.
Assert
.assertTrue(waitForResourceState(lr, rls, req,
LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED,
200));
Assert.assertTrue(lr.getState().equals(ResourceState.FAILED));
Assert.assertEquals(0, localizerRunner1.scheduled.size());
// Now Container-2 once again sends heart beat to resource localization
// service
// Now container-2 again try to download the resource it should still
// not get the resource as the resource is now not in DOWNLOADING state.
response2 = rls.heartbeat(createLocalizerStatus(localizerId2));
// Resource must not have been added to scheduled map.
// Also as the resource has failed download it will be removed from
// pending list.
Assert.assertEquals(0, localizerRunner2.scheduled.size());
Assert.assertEquals(0, localizerRunner2.pending.size());
Assert.assertEquals(0, response2.getResourceSpecs().size());
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private LocalizerStatus createLocalizerStatusForFailedResource(
String localizerId, LocalResourceRequest req) {
LocalizerStatus status = createLocalizerStatus(localizerId);
LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
resourceStatus.setException(new YarnRemoteExceptionPBImpl("test"));
resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
resourceStatus.setResource(req);
status.addResourceStatus(resourceStatus);
return status;
}
private LocalizerStatus createLocalizerStatus(String localizerId1) {
LocalizerStatus status = new LocalizerStatusPBImpl();
status.setLocalizerId(localizerId1);
return status;
}
private LocalizationEvent createApplicationLocalizationEvent(String user,
ApplicationId appId) {
Application app = mock(Application.class);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
return new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app);
}
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
public void testParallelDownloadAttemptsForPublicResource() throws Exception {
DrainDispatcher dispatcher1 = null;
String user = "testuser";
try {
// Setting up ResourceLocalization service.
Configuration conf = new Configuration();
dispatcher1 = new DrainDispatcher();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
// We don't want files to be created
doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
anyBoolean());
// creating one local directory
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[1];
for (int i = 0; i < 1; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// setting log directory.
String logDir =
lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
// Registering event handlers
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher1.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher1.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = mock(DeletionService.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
// initializing directory handler.
dirsHandler.init(conf);
dispatcher1.init(conf);
dispatcher1.start();
// Creating and initializing ResourceLocalizationService but not starting
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
// Initially pending map should be empty for public localizer
Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size());
LocalResourceRequest req =
new LocalResourceRequest(new Path("/tmp"), 123L,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");
// Initializing application
ApplicationImpl app = mock(ApplicationImpl.class);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
when(app.getAppId()).thenReturn(appId);
when(app.getUser()).thenReturn(user);
dispatcher1.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
// Container - 1
// container requesting the resource
ContainerImpl container1 = createMockContainer(user, 1);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container1,
LocalResourceVisibility.PUBLIC, req));
// Waiting for resource to change into DOWNLOADING state.
Assert.assertTrue(waitForResourceState(null, spyService, req,
LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING,
200));
// Waiting for download to start.
Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200));
LocalizedResource lr =
getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC,
user, null);
// Resource would now have moved into DOWNLOADING state
Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
// pending should have this resource now.
Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size());
// Now resource should have 0 permit.
Assert.assertEquals(0, lr.sem.availablePermits());
// Container - 2
// Container requesting the same resource.
ContainerImpl container2 = createMockContainer(user, 2);
dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2,
LocalResourceVisibility.PUBLIC, req));
// Waiting for download to start. This should return false as new download
// will not start
Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100));
// Now Failing the resource download. As a part of it
// resource state is changed and then lock is released.
ResourceFailedLocalizationEvent locFailedEvent =
new ResourceFailedLocalizationEvent(req, new Exception("test"));
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user,
null).handle(locFailedEvent);
// Waiting for resource to change into FAILED state.
Assert.assertTrue(waitForResourceState(lr, spyService, req,
LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200));
// releasing lock as a part of download failed process.
lr.unlock();
// removing pending download request.
spyService.getPublicLocalizer().pending.clear();
// Now I need to simulate a race condition wherein Event is added to
// dispatcher before resource state changes to either FAILED or LOCALIZED
// Hence sending event directly to dispatcher.
LocalizerResourceRequestEvent localizerEvent =
new LocalizerResourceRequestEvent(lr, null,
mock(LocalizerContext.class), null);
dispatcher1.getEventHandler().handle(localizerEvent);
// Waiting for download to start. This should return false as new download
// will not start
Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100));
// Checking available permits now.
Assert.assertEquals(1, lr.sem.availablePermits());
} finally {
if (dispatcher1 != null) {
dispatcher1.stop();
}
}
}
private boolean waitForPrivateDownloadToStart(
ResourceLocalizationService service, String localizerId, int size,
int maxWaitTime) {
List<LocalizerResourceRequestEvent> pending = null;
// Waiting for localizer to be created.
do {
if (service.getPrivateLocalizers().get(localizerId) != null) {
pending = service.getPrivateLocalizers().get(localizerId).pending;
}
if (pending == null) {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
if (pending == null) {
return false;
}
do {
if (pending.size() == size) {
return true;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
}
} while (maxWaitTime > 0);
return pending.size() == size;
}
private boolean waitForPublicDownloadToStart(
ResourceLocalizationService service, int size, int maxWaitTime) {
Map<Future<Path>, LocalizerResourceRequestEvent> pending = null;
// Waiting for localizer to be created.
do {
if (service.getPublicLocalizer() != null) {
pending = service.getPublicLocalizer().pending;
}
if (pending == null) {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (Exception e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
if (pending == null) {
return false;
}
do {
if (pending.size() == size) {
return true;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
} while (maxWaitTime > 0);
return pending.size() == size;
}
private LocalizedResource getLocalizedResource(
ResourceLocalizationService service, LocalResourceRequest req,
LocalResourceVisibility vis, String user, ApplicationId appId) {
return service.getLocalResourcesTracker(vis, user, appId)
.getLocalizedResource(req);
}
private boolean waitForResourceState(LocalizedResource lr,
ResourceLocalizationService service, LocalResourceRequest req,
LocalResourceVisibility vis, String user, ApplicationId appId,
ResourceState resourceState, long maxWaitTime) {
LocalResourcesTracker tracker = null;
// checking tracker is created
do {
if (tracker == null) {
tracker = service.getLocalResourcesTracker(vis, user, appId);
}
if (tracker != null && lr == null) {
lr = tracker.getLocalizedResource(req);
}
if (lr != null) {
break;
} else {
try {
maxWaitTime -= 20;
Thread.sleep(20);
} catch (InterruptedException e) {
}
}
} while (maxWaitTime > 0);
// this will wait till resource state is changed to (resourceState).
if (lr == null) {
return false;
}
do {
if (!lr.getState().equals(resourceState)) {
try {
maxWaitTime -= 50;
Thread.sleep(50);
} catch (InterruptedException e) {
}
} else {
break;
}
} while (maxWaitTime > 0);
return lr.getState().equals(resourceState);
}
private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
ContainerImpl container, LocalResourceVisibility vis,
LocalResourceRequest req) {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
List<LocalResourceRequest> resourceList =
new ArrayList<LocalResourceRequest>();
resourceList.add(req);
reqs.put(vis, resourceList);
return new ContainerLocalizationRequestEvent(container, reqs);
}
private ContainerImpl createMockContainer(String user, int containerId) {
ContainerImpl container = mock(ContainerImpl.class);
when(container.getContainerID()).thenReturn(
BuilderUtils.newContainerId(1, 1, 1, containerId));
when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class);
when(container.getCredentials()).thenReturn(mockCredentials);
return container;
}
private static URL getPath(String path) { private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path); URL url = BuilderUtils.newURL("file", null, 0, path);
return url; return url;