YARN-3074. Nodemanager dies when localizer runner tries to write to a full disk. Contributed by Varun Saxena

(cherry picked from commit b379972ab3)
This commit is contained in:
Jason Lowe 2015-02-11 16:33:43 +00:00
parent a8eaec396f
commit 38333c8f29
3 changed files with 98 additions and 6 deletions

View File

@ -512,6 +512,9 @@ Release 2.7.0 - UNRELEASED
YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl. YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl.
(Chengbing Liu via junping_du) (Chengbing Liu via junping_du)
YARN-3074. Nodemanager dies when localizer runner tries to write to a full
disk (Varun Saxena via jlowe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -1063,6 +1064,7 @@ public class ResourceLocalizationService extends CompositeService
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
public void run() { public void run() {
Path nmPrivateCTokensPath = null; Path nmPrivateCTokensPath = null;
Throwable exception = null;
try { try {
// Get nmPrivateDir // Get nmPrivateDir
nmPrivateCTokensPath = nmPrivateCTokensPath =
@ -1090,14 +1092,19 @@ public class ResourceLocalizationService extends CompositeService
+ dirsHandler.getDisksHealthReport(false)); + dirsHandler.getDisksHealthReport(false));
} }
// TODO handle ExitCodeException separately? // TODO handle ExitCodeException separately?
} catch (FSError fe) {
exception = fe;
} catch (Exception e) { } catch (Exception e) {
LOG.info("Localizer failed", e); exception = e;
// 3) on error, report failure to Container and signal ABORT
// 3.1) notify resource of failed localization
ContainerId cId = context.getContainerId();
dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent(cId, null, e.getMessage()));
} finally { } finally {
if (exception != null) {
LOG.info("Localizer failed", exception);
// On error, report failure to Container and signal ABORT
// Notify resource of failed localization
ContainerId cId = context.getContainerId();
dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
cId, null, exception.getMessage()));
}
for (LocalizerResourceRequestEvent event : scheduled.values()) { for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock(); event.getResource().unlock();
} }

View File

@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.ChecksumOpt;
@ -715,6 +717,86 @@ public class TestResourceLocalizationService {
stateStore.close(); stateStore.close();
} }
} }
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizerRunnerException() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
dirsHandlerSpy.init(conf);
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(new Configuration());
delService.start();
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandlerSpy, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
try {
spyService.init(conf);
spyService.start();
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
Random r = new Random();
long seed = r.nextLong();
System.out.println("SEED: " + seed);
r.setSeed(seed);
final Container c = getMockContainer(appId, 42, "user0");
final LocalResource resource1 = getPrivateMockedResource(r);
System.out.println("Here 4");
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
List<LocalResourceRequest> privateResourceList =
new ArrayList<LocalResourceRequest>();
privateResourceList.add(req1);
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
final Constructor<?>[] constructors =
FSError.class.getDeclaredConstructors();
constructors[0].setAccessible(true);
FSError fsError =
(FSError) constructors[0].newInstance(new IOException("Disk Error"));
Mockito
.doThrow(fsError)
.when(dirsHandlerSpy)
.getLocalPathForWrite(isA(String.class));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
Thread.sleep(1000);
dispatcher.await();
// Verify if ContainerResourceFailedEvent is invoked on FSError
verify(containerBus).handle(isA(ContainerResourceFailedEvent.class));
} finally {
spyService.stop();
dispatcher.stop();
delService.stop();
}
}
@Test( timeout = 10000) @Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics