diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3076f545311..df06c5ea41d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -325,6 +325,9 @@ Release 0.23.7 - UNRELEASED YARN-400. RM can return null application resource usage report leading to NPE in client (Jason Lowe via tgraves) + YARN-426. Failure to download a public resource prevents further downloads + (Jason Lowe via bobby) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 1a880fe8e6d..9ca812e1ddd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -659,25 +659,23 @@ public class ResourceLocalizationService extends CompositeService new ContainerResourceFailedEvent( assoc.getContext().getContainerId(), assoc.getResource().getRequest(), e.getCause())); + List reqs; synchronized (attempts) { LocalResourceRequest req = assoc.getResource().getRequest(); - List reqs = attempts.get(req); + reqs = attempts.get(req); if (null == reqs) { LOG.error("Missing pending list for " + req); return; } - if (reqs.isEmpty()) { - attempts.remove(req); - } - /* - * Do not retry for now. Once failed is failed! - * LocalizerResourceRequestEvent request = reqs.remove(0); - - pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirs, - request.getResource().getRequest(), new Random())), - request); - */ } + attempts.remove(req); + } + // let the other containers know about the localization failure + for (LocalizerResourceRequestEvent reqEvent : reqs) { + dispatcher.getEventHandler().handle( + new ContainerResourceFailedEvent( + reqEvent.getContext().getContainerId(), + reqEvent.getResource().getRequest(), e.getCause())); + } } catch (CancellationException e) { // ignore; shutting down } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index b448dea5755..7ca2c91e3c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -27,13 +27,16 @@ import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; @@ -46,6 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; @@ -102,6 +108,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestResourceLocalizationService { @@ -512,6 +520,111 @@ public class TestResourceLocalizationService { } } + @Test(timeout=20000) + @SuppressWarnings("unchecked") // mocked generics + public void testFailedPublicResource() throws Exception { + Configuration conf = new YarnConfiguration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + doNothing().when(spylfs).mkdir( + isA(Path.class), isA(FsPermission.class), anyBoolean()); + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + DrainDispatcher dispatcher = new DrainDispatcher(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + dispatcher.init(conf); + dispatcher.start(); + + try { + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandler); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext( + isA(Configuration.class)); + + spyService.init(conf); + spyService.start(); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // init container. + final Container c = getMockContainer(appId, 42); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // cause chmod to fail after a delay + final CyclicBarrier barrier = new CyclicBarrier(2); + doAnswer(new Answer() { + public Void answer(InvocationOnMock invocation) throws IOException { + try { + barrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + throw new IOException("forced failure"); + } + }).when(spylfs) + .setPermission(isA(Path.class), isA(FsPermission.class)); + + // Queue up two localization requests for the same public resource + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq)); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq); + + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + + // allow the chmod to fail now that both requests have been queued + barrier.await(); + verify(containerBus, timeout(5000).times(2)) + .handle(isA(ContainerResourceFailedEvent.class)); + } finally { + dispatcher.stop(); + } + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url;