From 7049c83d0ec85082589f139ece8eade6b341f6f6 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 19 Apr 2013 21:18:05 +0000 Subject: [PATCH 1/4] HADOOP-9469. mapreduce/yarn source jars not included in dist tarball (Robert Parker via tgraves) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470047 13f79535-47bb-0310-9956-ffa450edef68 --- .../assemblies/hadoop-mapreduce-dist.xml | 56 +++++++++++++++ .../resources/assemblies/hadoop-tools.xml | 51 ++++++++++++- .../resources/assemblies/hadoop-yarn-dist.xml | 72 ++++++++++++++++++- .../hadoop-common/CHANGES.txt | 3 + hadoop-project/pom.xml | 1 + 5 files changed, 181 insertions(+), 2 deletions(-) diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml index ee5757644eb..155bec02c16 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml @@ -91,6 +91,62 @@ ${project.build.directory}/site /share/doc/hadoop/${hadoop.component} + + hadoop-mapreduce-client/hadoop-mapreduce-client-app/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-common/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-core/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-hs/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-mapreduce-examples/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml index 1e3356d3373..e05e9206c23 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml @@ -9,7 +9,7 @@ http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, + distributed under the Li2cense is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. @@ -44,6 +44,55 @@ lib/native + + ../hadoop-archives/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-datajoin/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-distcp/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-extras/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-gridmix/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-rumen/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + ../hadoop-streaming/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 117d8cdbac6..8423e708cbe 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -6,7 +6,7 @@ (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.01 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -66,6 +66,76 @@ **/* + + hadoop-yarn/hadoop-yarn-api/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-client/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-common/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf etc/hadoop diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 27199debd32..c01b1599b35 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1648,6 +1648,9 @@ Release 0.23.8 - UNRELEASED HADOOP-9233. Cover package org.apache.hadoop.io.compress.zlib with unit tests (Vadim Bondarev via jlowe) + HADOOP-9469. mapreduce/yarn source jars not included in dist tarball + (Robert Parker via tgraves) + Release 0.23.7 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 2d4e447d9ca..c90d5629cf3 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -952,6 +952,7 @@ package jar-no-fork + test-jar-no-fork From d5556cb5c4fcf167eb0c8bcdce4a1a5ef2cca496 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 19 Apr 2013 22:25:17 +0000 Subject: [PATCH 2/4] YARN-586. Fixed a typo in ApplicationSubmissionContext#setApplicationId. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470068 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../yarn/api/records/ApplicationSubmissionContext.java | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9a0e020822c..f81e59ed49d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -161,6 +161,9 @@ Release 2.0.5-beta - UNRELEASED YARN-514. Delayed store operations should not result in RM unavailability for app submission (Zhijie Shen via bikas) + YARN-586. Fixed a typo in ApplicationSubmissionContext#setApplicationId. + (Zhijie Shen via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 903ad1be379..41f1ee7f737 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -58,12 +58,12 @@ public interface ApplicationSubmissionContext { /** * Set the ApplicationId of the submitted application. - * @param appplicationId ApplicationId of the submitted - * application + * @param applicationId ApplicationId of the submitted + * application */ @Public @Stable - public void setApplicationId(ApplicationId appplicationId); + public void setApplicationId(ApplicationId applicationId); /** * Get the application name. From c570309b078d3c6080e89cd90c7c2157a270aaca Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 19 Apr 2013 22:35:43 +0000 Subject: [PATCH 3/4] YARN-547. Fixed race conditions in public and private resource localization which used to cause duplicate downloads. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470076 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../localizer/LocalResourcesTracker.java | 7 + .../localizer/LocalResourcesTrackerImpl.java | 10 + .../localizer/LocalizedResource.java | 32 +- .../ResourceLocalizationService.java | 150 +++--- .../TestLocalResourcesTrackerImpl.java | 8 +- .../localizer/TestLocalizedResource.java | 20 +- .../TestResourceLocalizationService.java | 487 +++++++++++++++++- 8 files changed, 597 insertions(+), 120 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f81e59ed49d..87e356dcbcc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -269,6 +269,9 @@ Release 2.0.5-beta - UNRELEASED YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication caused by YARN-514. (Zhijie Shen via vinodkv) + YARN-547. Fixed race conditions in public and private resource localization + which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 98ec471abf0..7d00d94e4f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import com.google.common.annotations.VisibleForTesting; + /** * Component tracking resources all of the same {@link LocalResourceVisibility} * @@ -41,4 +44,8 @@ interface LocalResourcesTracker String getUser(); long nextUniqueNumber(); + + @VisibleForTesting + @Private + LocalizedResource getLocalizedResource(LocalResourceRequest request); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 786b58ca5d0..dfbeb449349 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import com.google.common.annotations.VisibleForTesting; + /** * A collection of {@link LocalizedResource}s all of same @@ -307,4 +310,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { public long nextUniqueNumber() { return uniqueNumberGenerator.incrementAndGet(); } + + @VisibleForTesting + @Private + @Override + public LocalizedResource getLocalizedResource(LocalResourceRequest request) { + return localrsrc.get(request); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java index f0cd87b573a..22304fcd695 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java @@ -78,27 +78,20 @@ public class LocalizedResource implements EventHandler { // From INIT (ref == 0, awaiting req) .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) - .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new FetchDirectTransition()) - .addTransition(ResourceState.INIT, ResourceState.INIT, - ResourceEventType.RELEASE, new ReleaseTransition()) // From DOWNLOADING (ref > 0, may be localizing) .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!! .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, ResourceEventType.LOCALIZED, new FetchSuccessTransition()) - .addTransition(ResourceState.DOWNLOADING, - EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), - ResourceEventType.RELEASE, new ReleasePendingTransition()) + .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING, + ResourceEventType.RELEASE, new ReleaseTransition()) .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED, ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition()) // From LOCALIZED (ref >= 0, on disk) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) - .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -230,14 +223,6 @@ public class LocalizedResource implements EventHandler { } } - private static class FetchDirectTransition extends FetchSuccessTransition { - @Override - public void transition(LocalizedResource rsrc, ResourceEvent event) { - LOG.warn("Resource " + rsrc + " localized without listening container"); - super.transition(rsrc, event); - } - } - /** * Resource localized, notify waiting containers. */ @@ -304,17 +289,4 @@ public class LocalizedResource implements EventHandler { rsrc.release(relEvent.getContainer()); } } - - private static class ReleasePendingTransition implements - MultipleArcTransition { - @Override - public ResourceState transition(LocalizedResource rsrc, - ResourceEvent event) { - ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; - rsrc.release(relEvent.getContainer()); - return rsrc.ref.isEmpty() - ? ResourceState.INIT - : ResourceState.DOWNLOADING; - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a1f45..c852de72527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -31,7 +31,6 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; @@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; @@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ResourceLocalizationService extends CompositeService @@ -492,7 +494,25 @@ public class ResourceLocalizationService extends CompositeService + Path.SEPARATOR + appId; return path; } + + @VisibleForTesting + @Private + public PublicLocalizer getPublicLocalizer() { + return localizerTracker.publicLocalizer; + } + @VisibleForTesting + @Private + public LocalizerRunner getLocalizerRunner(String locId) { + return localizerTracker.privLocalizers.get(locId); + } + + @VisibleForTesting + @Private + public Map getPrivateLocalizers() { + return localizerTracker.privLocalizers; + } + /** * Sub-component handling the spawning of {@link ContainerLocalizer}s */ @@ -606,41 +626,20 @@ public class ResourceLocalizationService extends CompositeService final ExecutorService threadPool; final CompletionService queue; final Map,LocalizerResourceRequestEvent> pending; - // TODO hack to work around broken signaling - final Map> attempts; PublicLocalizer(Configuration conf) { this(conf, getLocalFileContext(conf), createLocalizerExecutor(conf), - new HashMap,LocalizerResourceRequestEvent>(), - new HashMap>()); + new HashMap,LocalizerResourceRequestEvent>()); } PublicLocalizer(Configuration conf, FileContext lfs, ExecutorService threadPool, - Map,LocalizerResourceRequestEvent> pending, - Map> attempts) { + Map,LocalizerResourceRequestEvent> pending) { super("Public Localizer"); this.lfs = lfs; this.conf = conf; this.pending = pending; - this.attempts = attempts; -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// for (int i = 0, n = localDirs.size(); i < n; ++i) { -// publicFilecache[i] = -// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString(); -// } -// conf.setStrings(PUBCACHE_CTXT, publicFilecache); - -// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf); -// List localDirs = dirsHandler.getLocalDirs(); -// String[] publicFilecache = new String[localDirs.size()]; -// int i = 0; -// for (String localDir : localDirs) { -// publicFilecache[i++] = -// new Path(localDir, ContainerLocalizer.FILECACHE).toString(); -// } this.threadPool = threadPool; this.queue = new ExecutorCompletionService(threadPool); @@ -648,36 +647,45 @@ public class ResourceLocalizationService extends CompositeService public void addResource(LocalizerResourceRequestEvent request) { // TODO handle failures, cancellation, requests by other containers - LocalResourceRequest key = request.getResource().getRequest(); + LocalizedResource rsrc = request.getResource(); + LocalResourceRequest key = rsrc.getRequest(); LOG.info("Downloading public rsrc:" + key); - synchronized (attempts) { - List sigh = attempts.get(key); - if (null == sigh) { + /* + * Here multiple containers may request the same resource. So we need + * to start downloading only when + * 1) ResourceState == DOWNLOADING + * 2) We are able to acquire non blocking semaphore lock. + * If not we will skip this resource as either it is getting downloaded + * or it FAILED / LOCALIZED. + */ + + if (rsrc.tryAcquire()) { + if (rsrc.getState().equals(ResourceState.DOWNLOADING)) { LocalResource resource = request.getResource().getRequest(); try { - Path publicDirDestPath = dirsHandler.getLocalPathForWrite( - "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, - ContainerLocalizer.getEstimatedSize(resource), true); + Path publicDirDestPath = + dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR + + ContainerLocalizer.FILECACHE, + ContainerLocalizer.getEstimatedSize(resource), true); Path hierarchicalPath = - publicRsrc.getPathForLocalization(key, publicDirDestPath); + publicRsrc.getPathForLocalization(key, publicDirDestPath); if (!hierarchicalPath.equals(publicDirDestPath)) { publicDirDestPath = hierarchicalPath; - DiskChecker.checkDir( - new File(publicDirDestPath.toUri().getPath())); + DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } publicDirDestPath = new Path(publicDirDestPath, Long.toString(publicRsrc .nextUniqueNumber())); - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource)), - request); - attempts.put(key, new LinkedList()); + pending.put(queue.submit(new FSDownload(lfs, null, conf, + publicDirDestPath, resource)), request); } catch (IOException e) { + rsrc.unlock(); + // TODO Need to Fix IO Exceptions - Notifying resource LOG.error("Local path for public localization is not found. " + " May be disks failed.", e); } } else { - sigh.add(request); + rsrc.unlock(); } } } @@ -700,24 +708,14 @@ public class ResourceLocalizationService extends CompositeService LocalResourceRequest key = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil .getDU(new File(local.toUri())))); - synchronized (attempts) { - attempts.remove(key); - } + assoc.getResource().unlock(); } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); LocalResourceRequest req = assoc.getResource().getRequest(); publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e .getCause())); - synchronized (attempts) { - List reqs; - reqs = attempts.get(req); - if (null == reqs) { - LOG.error("Missing pending list for " + req); - return; - } - attempts.remove(req); - } + assoc.getResource().unlock(); } catch (CancellationException e) { // ignore; shutting down } @@ -776,22 +774,35 @@ public class ResourceLocalizationService extends CompositeService i.hasNext();) { LocalizerResourceRequestEvent evt = i.next(); LocalizedResource nRsrc = evt.getResource(); - if (ResourceState.LOCALIZED.equals(nRsrc.getState())) { + // Resource download should take place ONLY if resource is in + // Downloading state + if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) { i.remove(); continue; } + /* + * Multiple containers will try to download the same resource. So the + * resource download should start only if + * 1) We can acquire a non blocking semaphore lock on resource + * 2) Resource is still in DOWNLOADING state + */ if (nRsrc.tryAcquire()) { - LocalResourceRequest nextRsrc = nRsrc.getRequest(); - LocalResource next = - recordFactory.newRecordInstance(LocalResource.class); - next.setResource( - ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); - next.setTimestamp(nextRsrc.getTimestamp()); - next.setType(nextRsrc.getType()); - next.setVisibility(evt.getVisibility()); - next.setPattern(evt.getPattern()); - scheduled.put(nextRsrc, evt); - return next; + if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc + .getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + next.setPattern(evt.getPattern()); + scheduled.put(nextRsrc, evt); + return next; + } else { + // Need to release acquired lock + nRsrc.unlock(); + } } } return null; @@ -863,6 +874,12 @@ public class ResourceLocalizationService extends CompositeService new ResourceLocalizedEvent(req, ConverterUtils .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); } catch (URISyntaxException e) { } + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + if (pending.isEmpty()) { // TODO: Synchronization response.setLocalizerAction(LocalizerAction.DIE); @@ -889,11 +906,16 @@ public class ResourceLocalizationService extends CompositeService break; case FETCH_FAILURE: LOG.info("DEBUG: FAILED " + req, stat.getException()); - assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle( new ResourceFailedLocalizationEvent(req, stat.getException())); + + // unlocking the resource and removing it from scheduled resource + // list + assoc.getResource().unlock(); + scheduled.remove(req); + break; default: LOG.info("Unknown status: " + stat.getStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index b2caba02e81..91c6c5e6abc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -129,14 +129,10 @@ public class TestLocalResourcesTrackerImpl { dispatcher.await(); verifyTrackedResourceCount(tracker, 2); - // Verify resources in state INIT with ref-count=0 is removed. - Assert.assertTrue(tracker.remove(lr2, mockDelService)); - verifyTrackedResourceCount(tracker, 1); - // Verify resource with non zero ref count is not removed. Assert.assertEquals(2, lr1.getRefCount()); Assert.assertFalse(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 1); + verifyTrackedResourceCount(tracker, 2); // Localize resource1 ResourceLocalizedEvent rle = @@ -151,7 +147,7 @@ public class TestLocalResourcesTrackerImpl { // Verify resources in state LOCALIZED with ref-count=0 is removed. Assert.assertTrue(tracker.remove(lr1, mockDelService)); - verifyTrackedResourceCount(tracker, 0); + verifyTrackedResourceCount(tracker, 1); } finally { if (dispatcher != null) { dispatcher.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index 07d8df1db6c..e42702a1888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -117,7 +117,7 @@ public class TestLocalizedResource { local.handle(new ResourceReleaseEvent(rsrcA, container1)); dispatcher.await(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); - assertEquals(ResourceState.INIT, local.getState()); + assertEquals(ResourceState.DOWNLOADING, local.getState()); // Register C2, C3 final ContainerId container2 = getMockContainer(2); @@ -176,24 +176,6 @@ public class TestLocalizedResource { } } - @Test - public void testDirectLocalization() throws Exception { - DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(new Configuration()); - try { - dispatcher.start(); - LocalResource apiRsrc = createMockResource(); - LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc); - LocalizedResource local = new LocalizedResource(rsrcA, dispatcher); - Path p = new Path("file:///cache/rsrcA"); - local.handle(new ResourceLocalizedEvent(rsrcA, p, 10)); - dispatcher.await(); - assertEquals(ResourceState.LOCALIZED, local.getState()); - } finally { - dispatcher.stop(); - } - } - static LocalResource createMockResource() { // mock rsrc location org.apache.hadoop.yarn.api.records.URL uriA = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 77bde7b1795..80ff72686a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -34,9 +34,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; @@ -53,6 +53,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import junit.framework.Assert; @@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAc import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; @@ -677,6 +687,481 @@ public class TestResourceLocalizationService { } } + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPrivateResource() throws Exception { + + DrainDispatcher dispatcher1 = null; + try { + dispatcher1 = new DrainDispatcher(); + String user = "testuser"; + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher1, exec, delService, + localDirHandler); + dispatcher1.register(LocalizationEventType.class, rls); + rls.init(conf); + + rls.handle(createApplicationLocalizationEvent(user, appId)); + + LocalResourceRequest req = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + + // We need to pre-populate the LocalizerRunner as the + // Resource Localization Service code internally starts them which + // definitely we don't want. + + // creating new containers and populating corresponding localizer runners + + // Container - 1 + ContainerImpl container1 = createMockContainer(user, 1); + String localizerId1 = container1.getContainerID().toString(); + rls.getPrivateLocalizers().put( + localizerId1, + rls.new LocalizerRunner(new LocalizerContext(user, container1 + .getContainerID(), null), localizerId1)); + LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1); + + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container1, + LocalResourceVisibility.PRIVATE, req)); + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200)); + + // Container - 2 now makes the request. + ContainerImpl container2 = createMockContainer(user, 2); + String localizerId2 = container2.getContainerID().toString(); + rls.getPrivateLocalizers().put( + localizerId2, + rls.new LocalizerRunner(new LocalizerContext(user, container2 + .getContainerID(), null), localizerId2)); + LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container2, + LocalResourceVisibility.PRIVATE, req)); + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200)); + + // Retrieving localized resource. + LocalResourcesTracker tracker = + rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user, + appId); + LocalizedResource lr = tracker.getLocalizedResource(req); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + // Resource should have one permit + Assert.assertEquals(1, lr.sem.availablePermits()); + + // Resource Localization Service receives first heart beat from + // ContainerLocalizer for container1 + LocalizerHeartbeatResponse response1 = + rls.heartbeat(createLocalizerStatus(localizerId1)); + + // Resource must have been added to scheduled map + Assert.assertEquals(1, localizerRunner1.scheduled.size()); + // Checking resource in the response and also available permits for it. + Assert.assertEquals(req.getResource(), response1.getResourceSpecs() + .get(0).getResource().getResource()); + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Resource Localization Service now receives first heart beat from + // ContainerLocalizer for container2 + LocalizerHeartbeatResponse response2 = + rls.heartbeat(createLocalizerStatus(localizerId2)); + + // Resource must not have been added to scheduled map + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + // No resource is returned in response + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + // ContainerLocalizer - 1 now sends failed resource heartbeat. + rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req)); + + // Resource Localization should fail and state is modified accordingly. + // Also Local should be release on the LocalizedResource. + Assert + .assertTrue(waitForResourceState(lr, rls, req, + LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED, + 200)); + Assert.assertTrue(lr.getState().equals(ResourceState.FAILED)); + Assert.assertEquals(0, localizerRunner1.scheduled.size()); + + // Now Container-2 once again sends heart beat to resource localization + // service + + // Now container-2 again try to download the resource it should still + // not get the resource as the resource is now not in DOWNLOADING state. + response2 = rls.heartbeat(createLocalizerStatus(localizerId2)); + + // Resource must not have been added to scheduled map. + // Also as the resource has failed download it will be removed from + // pending list. + Assert.assertEquals(0, localizerRunner2.scheduled.size()); + Assert.assertEquals(0, localizerRunner2.pending.size()); + Assert.assertEquals(0, response2.getResourceSpecs().size()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + } + + private LocalizerStatus createLocalizerStatusForFailedResource( + String localizerId, LocalResourceRequest req) { + LocalizerStatus status = createLocalizerStatus(localizerId); + LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl(); + resourceStatus.setException(new YarnRemoteExceptionPBImpl("test")); + resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE); + resourceStatus.setResource(req); + status.addResourceStatus(resourceStatus); + return status; + } + + private LocalizerStatus createLocalizerStatus(String localizerId1) { + LocalizerStatus status = new LocalizerStatusPBImpl(); + status.setLocalizerId(localizerId1); + return status; + } + + private LocalizationEvent createApplicationLocalizationEvent(String user, + ApplicationId appId) { + Application app = mock(Application.class); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + return new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app); + } + + @Test(timeout = 100000) + @SuppressWarnings("unchecked") + public void testParallelDownloadAttemptsForPublicResource() throws Exception { + + DrainDispatcher dispatcher1 = null; + String user = "testuser"; + try { + // Setting up ResourceLocalization service. + Configuration conf = new Configuration(); + dispatcher1 = new DrainDispatcher(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + // Registering event handlers + EventHandler applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + // Creating and initializing ResourceLocalizationService but not starting + // it as otherwise it will remove requests from pending queue. + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher1, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + dispatcher1.register(LocalizationEventType.class, spyService); + spyService.init(conf); + + // Initially pending map should be empty for public localizer + Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size()); + + LocalResourceRequest req = + new LocalResourceRequest(new Path("/tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, ""); + + // Initializing application + ApplicationImpl app = mock(ApplicationImpl.class); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + when(app.getAppId()).thenReturn(appId); + when(app.getUser()).thenReturn(user); + dispatcher1.getEventHandler().handle( + new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + + // Container - 1 + + // container requesting the resource + ContainerImpl container1 = createMockContainer(user, 1); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container1, + LocalResourceVisibility.PUBLIC, req)); + + // Waiting for resource to change into DOWNLOADING state. + Assert.assertTrue(waitForResourceState(null, spyService, req, + LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING, + 200)); + + // Waiting for download to start. + Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200)); + + LocalizedResource lr = + getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC, + user, null); + // Resource would now have moved into DOWNLOADING state + Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState()); + + // pending should have this resource now. + Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size()); + // Now resource should have 0 permit. + Assert.assertEquals(0, lr.sem.availablePermits()); + + // Container - 2 + + // Container requesting the same resource. + ContainerImpl container2 = createMockContainer(user, 2); + dispatcher1.getEventHandler().handle( + createContainerLocalizationEvent(container2, + LocalResourceVisibility.PUBLIC, req)); + + // Waiting for download to start. This should return false as new download + // will not start + Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100)); + + // Now Failing the resource download. As a part of it + // resource state is changed and then lock is released. + ResourceFailedLocalizationEvent locFailedEvent = + new ResourceFailedLocalizationEvent(req, new Exception("test")); + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user, + null).handle(locFailedEvent); + + // Waiting for resource to change into FAILED state. + Assert.assertTrue(waitForResourceState(lr, spyService, req, + LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200)); + // releasing lock as a part of download failed process. + lr.unlock(); + // removing pending download request. + spyService.getPublicLocalizer().pending.clear(); + + // Now I need to simulate a race condition wherein Event is added to + // dispatcher before resource state changes to either FAILED or LOCALIZED + // Hence sending event directly to dispatcher. + LocalizerResourceRequestEvent localizerEvent = + new LocalizerResourceRequestEvent(lr, null, + mock(LocalizerContext.class), null); + + dispatcher1.getEventHandler().handle(localizerEvent); + // Waiting for download to start. This should return false as new download + // will not start + Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100)); + // Checking available permits now. + Assert.assertEquals(1, lr.sem.availablePermits()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + + } + + private boolean waitForPrivateDownloadToStart( + ResourceLocalizationService service, String localizerId, int size, + int maxWaitTime) { + List pending = null; + // Waiting for localizer to be created. + do { + if (service.getPrivateLocalizers().get(localizerId) != null) { + pending = service.getPrivateLocalizers().get(localizerId).pending; + } + if (pending == null) { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + if (pending == null) { + return false; + } + do { + if (pending.size() == size) { + return true; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } + } while (maxWaitTime > 0); + return pending.size() == size; + } + + private boolean waitForPublicDownloadToStart( + ResourceLocalizationService service, int size, int maxWaitTime) { + Map, LocalizerResourceRequestEvent> pending = null; + // Waiting for localizer to be created. + do { + if (service.getPublicLocalizer() != null) { + pending = service.getPublicLocalizer().pending; + } + if (pending == null) { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (Exception e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + if (pending == null) { + return false; + } + do { + if (pending.size() == size) { + return true; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (InterruptedException e) { + } + } + } while (maxWaitTime > 0); + return pending.size() == size; + + } + + private LocalizedResource getLocalizedResource( + ResourceLocalizationService service, LocalResourceRequest req, + LocalResourceVisibility vis, String user, ApplicationId appId) { + return service.getLocalResourcesTracker(vis, user, appId) + .getLocalizedResource(req); + } + + private boolean waitForResourceState(LocalizedResource lr, + ResourceLocalizationService service, LocalResourceRequest req, + LocalResourceVisibility vis, String user, ApplicationId appId, + ResourceState resourceState, long maxWaitTime) { + LocalResourcesTracker tracker = null; + // checking tracker is created + do { + if (tracker == null) { + tracker = service.getLocalResourcesTracker(vis, user, appId); + } + if (tracker != null && lr == null) { + lr = tracker.getLocalizedResource(req); + } + if (lr != null) { + break; + } else { + try { + maxWaitTime -= 20; + Thread.sleep(20); + } catch (InterruptedException e) { + } + } + } while (maxWaitTime > 0); + // this will wait till resource state is changed to (resourceState). + if (lr == null) { + return false; + } + do { + if (!lr.getState().equals(resourceState)) { + try { + maxWaitTime -= 50; + Thread.sleep(50); + } catch (InterruptedException e) { + } + } else { + break; + } + } while (maxWaitTime > 0); + return lr.getState().equals(resourceState); + } + + private ContainerLocalizationRequestEvent createContainerLocalizationEvent( + ContainerImpl container, LocalResourceVisibility vis, + LocalResourceRequest req) { + Map> reqs = + new HashMap>(); + List resourceList = + new ArrayList(); + resourceList.add(req); + reqs.put(vis, resourceList); + return new ContainerLocalizationRequestEvent(container, reqs); + } + + private ContainerImpl createMockContainer(String user, int containerId) { + ContainerImpl container = mock(ContainerImpl.class); + when(container.getContainerID()).thenReturn( + BuilderUtils.newContainerId(1, 1, 1, containerId)); + when(container.getUser()).thenReturn(user); + Credentials mockCredentials = mock(Credentials.class); + when(container.getCredentials()).thenReturn(mockCredentials); + return container; + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url; From a91067fc5e3e4e0d619d29c500297a960b586153 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 20 Apr 2013 00:35:29 +0000 Subject: [PATCH 4/4] YARN-542. Changed the default global AM max-attempts value to be not one. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1470094 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../org/apache/hadoop/yarn/conf/YarnConfiguration.java | 2 +- .../src/main/resources/yarn-default.xml | 5 +++-- .../hadoop/yarn/server/resourcemanager/TestRMRestart.java | 8 ++++++-- .../resourcemanager/rmapp/TestRMAppTransitions.java | 5 ++++- .../resourcemanager/webapp/TestRMWebServicesApps.java | 7 +++++-- 6 files changed, 22 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 87e356dcbcc..aad620e075b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.0.5-beta - UNRELEASED YARN-586. Fixed a typo in ApplicationSubmissionContext#setApplicationId. (Zhijie Shen via vinodkv) + YARN-542. Changed the default global AM max-attempts value to be not one. + (Zhijie Shen via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 786598b9c36..1ff85eddc95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -186,7 +186,7 @@ public class YarnConfiguration extends Configuration { */ public static final String RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.max-attempts"; - public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 1; + public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; /** The keytab for the resource manager.*/ public static final String RM_KEYTAB = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f873ff9d052..15775bed557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -145,9 +145,10 @@ setting for all application masters. Each application master can specify its individual maximum number of application attempts via the API, but the individual number cannot be more than the global upper bound. If it is, - the resourcemanager will override it. + the resourcemanager will override it. The default number is set to 2, to + allow at least one retry for AM. yarn.resourcemanager.am.max-attempts - 1 + 2 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 78adf79eba0..6e75297be7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -64,7 +64,9 @@ public class TestRMRestart { "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); conf.set(YarnConfiguration.RM_SCHEDULER, "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5); + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); @@ -321,7 +323,9 @@ public class TestRMRestart { conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); - conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 65d5b323263..60ea8fc5672 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -62,7 +62,8 @@ public class TestRMAppTransitions { static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); private RMContext rmContext; - private static int maxAppAttempts = 4; + private static int maxAppAttempts = + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; private static int appId = 1; private DrainDispatcher rmDispatcher; @@ -499,6 +500,7 @@ public class TestRMAppTransitions { RMApp application = testCreateAppAccepted(null); // ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED + Assert.assertTrue(maxAppAttempts > 1); for (int i=1; i < maxAppAttempts; i++) { RMAppEvent event = new RMAppFailedAttemptEvent(application.getApplicationId(), @@ -562,6 +564,7 @@ public class TestRMAppTransitions { Assert.assertEquals(expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId()); // RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED + Assert.assertTrue(maxAppAttempts > 1); for (int i=1; i 1); int retriesLeft = maxAppAttempts; while (--retriesLeft > 0) { RMAppEvent event =