diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2203e0a95bb..f83ccf6f74c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -512,6 +512,9 @@ Release 2.7.0 - UNRELEASED YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl. (Chengbing Liu via junping_du) + YARN-3074. Nodemanager dies when localizer runner tries to write to a full + disk (Varun Saxena via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 8c84132ab02..dd50eadf458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -1063,6 +1064,7 @@ public class ResourceLocalizationService extends CompositeService @SuppressWarnings("unchecked") // dispatcher not typed public void run() { Path nmPrivateCTokensPath = null; + Throwable exception = null; try { // Get nmPrivateDir nmPrivateCTokensPath = @@ -1090,14 +1092,19 @@ public class ResourceLocalizationService extends CompositeService + dirsHandler.getDisksHealthReport(false)); } // TODO handle ExitCodeException separately? + } catch (FSError fe) { + exception = fe; } catch (Exception e) { - LOG.info("Localizer failed", e); - // 3) on error, report failure to Container and signal ABORT - // 3.1) notify resource of failed localization - ContainerId cId = context.getContainerId(); - dispatcher.getEventHandler().handle( - new ContainerResourceFailedEvent(cId, null, e.getMessage())); + exception = e; } finally { + if (exception != null) { + LOG.info("Localizer failed", exception); + // On error, report failure to Container and signal ABORT + // Notify resource of failed localization + ContainerId cId = context.getContainerId(); + dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent( + cId, null, exception.getMessage())); + } for (LocalizerResourceRequestEvent event : scheduled.values()) { event.getResource().unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 30af5a43591..d3c3521f305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; @@ -715,6 +717,86 @@ public class TestResourceLocalizationService { stateStore.close(); } } + + + @Test( timeout = 10000) + @SuppressWarnings("unchecked") // mocked generics + public void testLocalizerRunnerException() throws Exception { + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler); + dirsHandlerSpy.init(conf); + + DeletionService delServiceReal = new DeletionService(exec); + DeletionService delService = spy(delServiceReal); + delService.init(new Configuration()); + delService.start(); + + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandlerSpy, nmContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + try { + spyService.init(conf); + spyService.start(); + + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn("user0"); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + final Container c = getMockContainer(appId, 42, "user0"); + final LocalResource resource1 = getPrivateMockedResource(r); + System.out.println("Here 4"); + + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + Map> rsrcs = + new HashMap>(); + List privateResourceList = + new ArrayList(); + privateResourceList.add(req1); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); + + final Constructor[] constructors = + FSError.class.getDeclaredConstructors(); + constructors[0].setAccessible(true); + FSError fsError = + (FSError) constructors[0].newInstance(new IOException("Disk Error")); + + Mockito + .doThrow(fsError) + .when(dirsHandlerSpy) + .getLocalPathForWrite(isA(String.class)); + spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); + Thread.sleep(1000); + dispatcher.await(); + // Verify if ContainerResourceFailedEvent is invoked on FSError + verify(containerBus).handle(isA(ContainerResourceFailedEvent.class)); + } finally { + spyService.stop(); + dispatcher.stop(); + delService.stop(); + } + } @Test( timeout = 10000) @SuppressWarnings("unchecked") // mocked generics