svn merge -c 1375164 from trunk FIXES: MAPREDUCE-4323. NM leaks filesystems (Jason Lowe via jeagles)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1375178 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d6c75fbbb1
commit
9f82b74cbe
|
@ -53,3 +53,5 @@ Release 0.23.3 - Unreleased
|
||||||
|
|
||||||
YARN-27. Failed refreshQueues due to misconfiguration prevents further
|
YARN-27. Failed refreshQueues due to misconfiguration prevents further
|
||||||
refreshing of queues (Arun Murthy via tgraves)
|
refreshing of queues (Arun Murthy via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4323. NM leaks filesystems (Jason Lowe via jeagles)
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -176,10 +177,14 @@ public class ContainerLocalizer {
|
||||||
e.printStackTrace(System.out);
|
e.printStackTrace(System.out);
|
||||||
return -1;
|
return -1;
|
||||||
} finally {
|
} finally {
|
||||||
if (exec != null) {
|
try {
|
||||||
exec.shutdownNow();
|
if (exec != null) {
|
||||||
|
exec.shutdownNow();
|
||||||
|
}
|
||||||
|
LocalDirAllocator.removeContext(appCacheDirContextName);
|
||||||
|
} finally {
|
||||||
|
closeFileSystems(ugi);
|
||||||
}
|
}
|
||||||
LocalDirAllocator.removeContext(appCacheDirContextName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +220,15 @@ public class ContainerLocalizer {
|
||||||
TimeUnit.SECONDS.sleep(duration);
|
TimeUnit.SECONDS.sleep(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void localizeFiles(LocalizationProtocol nodemanager,
|
protected void closeFileSystems(UserGroupInformation ugi) {
|
||||||
|
try {
|
||||||
|
FileSystem.closeAllForUGI(ugi);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to close filesystems: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void localizeFiles(LocalizationProtocol nodemanager,
|
||||||
CompletionService<Path> cs, UserGroupInformation ugi)
|
CompletionService<Path> cs, UserGroupInformation ugi)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
public class LogAggregationService extends AbstractService implements
|
public class LogAggregationService extends AbstractService implements
|
||||||
|
@ -203,7 +202,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
fs.setPermission(path, new FsPermission(fsPerm));
|
fs.setPermission(path, new FsPermission(fsPerm));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createAppDir(final String user, final ApplicationId appId,
|
protected void createAppDir(final String user, final ApplicationId appId,
|
||||||
UserGroupInformation userUgi) {
|
UserGroupInformation userUgi) {
|
||||||
try {
|
try {
|
||||||
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
|
@ -286,13 +285,12 @@ public class LogAggregationService extends AbstractService implements
|
||||||
this.dispatcher.getEventHandler().handle(eventResponse);
|
this.dispatcher.getEventHandler().handle(eventResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
protected void initAppAggregator(final ApplicationId appId, String user,
|
||||||
public void initAppAggregator(final ApplicationId appId, String user,
|
|
||||||
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
|
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
|
||||||
Map<ApplicationAccessType, String> appAcls) {
|
Map<ApplicationAccessType, String> appAcls) {
|
||||||
|
|
||||||
// Get user's FileSystem credentials
|
// Get user's FileSystem credentials
|
||||||
UserGroupInformation userUgi =
|
final UserGroupInformation userUgi =
|
||||||
UserGroupInformation.createRemoteUser(user);
|
UserGroupInformation.createRemoteUser(user);
|
||||||
if (credentials != null) {
|
if (credentials != null) {
|
||||||
for (Token<? extends TokenIdentifier> token : credentials
|
for (Token<? extends TokenIdentifier> token : credentials
|
||||||
|
@ -301,9 +299,6 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the app dir
|
|
||||||
createAppDir(user, appId, userUgi);
|
|
||||||
|
|
||||||
// New application
|
// New application
|
||||||
final AppLogAggregator appLogAggregator =
|
final AppLogAggregator appLogAggregator =
|
||||||
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
|
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
|
||||||
|
@ -313,6 +308,14 @@ public class LogAggregationService extends AbstractService implements
|
||||||
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
|
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
|
||||||
throw new YarnException("Duplicate initApp for " + appId);
|
throw new YarnException("Duplicate initApp for " + appId);
|
||||||
}
|
}
|
||||||
|
// wait until check for existing aggregator to create dirs
|
||||||
|
try {
|
||||||
|
// Create the app dir
|
||||||
|
createAppDir(user, appId, userUgi);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
closeFileSystems(userUgi);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO Get the user configuration for the list of containers that need log
|
// TODO Get the user configuration for the list of containers that need log
|
||||||
|
@ -325,12 +328,21 @@ public class LogAggregationService extends AbstractService implements
|
||||||
appLogAggregator.run();
|
appLogAggregator.run();
|
||||||
} finally {
|
} finally {
|
||||||
appLogAggregators.remove(appId);
|
appLogAggregators.remove(appId);
|
||||||
|
closeFileSystems(userUgi);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.threadPool.execute(aggregatorWrapper);
|
this.threadPool.execute(aggregatorWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void closeFileSystems(final UserGroupInformation userUgi) {
|
||||||
|
try {
|
||||||
|
FileSystem.closeAllForUGI(userUgi);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to close filesystems: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// for testing only
|
// for testing only
|
||||||
@Private
|
@Private
|
||||||
int getNumAggregators() {
|
int getNumAggregators() {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
|
@ -27,6 +28,7 @@ import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Matchers.same;
|
import static org.mockito.Matchers.same;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
@ -57,6 +59,7 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
@ -76,47 +79,28 @@ public class TestContainerLocalizer {
|
||||||
static final Path basedir =
|
static final Path basedir =
|
||||||
new Path("target", TestContainerLocalizer.class.getName());
|
new Path("target", TestContainerLocalizer.class.getName());
|
||||||
|
|
||||||
@Test
|
static final String appUser = "yak";
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
static final String appId = "app_RM_0";
|
||||||
public void testContainerLocalizerMain() throws Exception {
|
static final String containerId = "container_0";
|
||||||
Configuration conf = new Configuration();
|
static final InetSocketAddress nmAddr =
|
||||||
AbstractFileSystem spylfs =
|
new InetSocketAddress("foobar", 8040);
|
||||||
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
|
||||||
// don't actually create dirs
|
|
||||||
doNothing().when(spylfs).mkdir(
|
|
||||||
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
|
||||||
FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
|
||||||
final String user = "yak";
|
|
||||||
final String appId = "app_RM_0";
|
|
||||||
final String cId = "container_0";
|
|
||||||
final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 8040);
|
|
||||||
final List<Path> localDirs = new ArrayList<Path>();
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
|
||||||
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
|
||||||
}
|
|
||||||
RecordFactory mockRF = getMockLocalizerRecordFactory();
|
|
||||||
ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
|
|
||||||
appId, cId, localDirs, mockRF);
|
|
||||||
ContainerLocalizer localizer = spy(concreteLoc);
|
|
||||||
|
|
||||||
// return credential stream instead of opening local file
|
private AbstractFileSystem spylfs;
|
||||||
final Random r = new Random();
|
private Random random;
|
||||||
long seed = r.nextLong();
|
private List<Path> localDirs;
|
||||||
r.setSeed(seed);
|
private Path tokenPath;
|
||||||
System.out.println("SEED: " + seed);
|
private LocalizationProtocol nmProxy;
|
||||||
DataInputBuffer appTokens = createFakeCredentials(r, 10);
|
|
||||||
Path tokenPath =
|
@Test
|
||||||
lfs.makeQualified(new Path(
|
public void testContainerLocalizerMain() throws Exception {
|
||||||
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, cId)));
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
|
||||||
).when(spylfs).open(tokenPath);
|
|
||||||
|
|
||||||
// mock heartbeat responses from NM
|
// mock heartbeat responses from NM
|
||||||
LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
|
LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
||||||
LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
||||||
LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
LocalResource rsrcC = getMockRsrc(random,
|
||||||
LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
|
LocalResourceVisibility.APPLICATION);
|
||||||
LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
|
LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
|
||||||
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
|
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
|
||||||
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
|
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
|
||||||
Collections.singletonList(rsrcA)))
|
Collections.singletonList(rsrcA)))
|
||||||
|
@ -130,6 +114,7 @@ public class TestContainerLocalizer {
|
||||||
Collections.<LocalResource>emptyList()))
|
Collections.<LocalResource>emptyList()))
|
||||||
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
|
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
|
||||||
null));
|
null));
|
||||||
|
|
||||||
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
|
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
|
||||||
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
|
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
|
||||||
isA(UserGroupInformation.class));
|
isA(UserGroupInformation.class));
|
||||||
|
@ -142,33 +127,13 @@ public class TestContainerLocalizer {
|
||||||
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
|
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
|
||||||
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
|
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
|
||||||
isA(UserGroupInformation.class));
|
isA(UserGroupInformation.class));
|
||||||
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
|
||||||
doNothing().when(localizer).sleep(anyInt());
|
|
||||||
|
|
||||||
// return result instantly for deterministic test
|
|
||||||
ExecutorService syncExec = mock(ExecutorService.class);
|
|
||||||
CompletionService<Path> cs = mock(CompletionService.class);
|
|
||||||
when(cs.submit(isA(Callable.class)))
|
|
||||||
.thenAnswer(new Answer<Future<Path>>() {
|
|
||||||
@Override
|
|
||||||
public Future<Path> answer(InvocationOnMock invoc)
|
|
||||||
throws Throwable {
|
|
||||||
Future<Path> done = mock(Future.class);
|
|
||||||
when(done.isDone()).thenReturn(true);
|
|
||||||
FakeDownload d = (FakeDownload) invoc.getArguments()[0];
|
|
||||||
when(done.get()).thenReturn(d.call());
|
|
||||||
return done;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
|
||||||
doReturn(cs).when(localizer).createCompletionService(syncExec);
|
|
||||||
|
|
||||||
// run localization
|
// run localization
|
||||||
assertEquals(0, localizer.runLocalization(nmAddr));
|
assertEquals(0, localizer.runLocalization(nmAddr));
|
||||||
|
|
||||||
// verify created cache
|
// verify created cache
|
||||||
for (Path p : localDirs) {
|
for (Path p : localDirs) {
|
||||||
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
|
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
|
||||||
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
|
||||||
// $x/usercache/$user/filecache
|
// $x/usercache/$user/filecache
|
||||||
verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
|
verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(false));
|
||||||
|
@ -194,11 +159,91 @@ public class TestContainerLocalizer {
|
||||||
@Override
|
@Override
|
||||||
public boolean matches(Object o) {
|
public boolean matches(Object o) {
|
||||||
LocalizerStatus status = (LocalizerStatus) o;
|
LocalizerStatus status = (LocalizerStatus) o;
|
||||||
return !cId.equals(status.getLocalizerId());
|
return !containerId.equals(status.getLocalizerId());
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
|
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
||||||
|
// verify filesystems are closed when localizer doesn't fail
|
||||||
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
|
verify(localizer, never()).closeFileSystems(
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
localizer.runLocalization(nmAddr);
|
||||||
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
|
|
||||||
|
// verify filesystems are closed when localizer fails
|
||||||
|
localizer = setupContainerLocalizerForTest();
|
||||||
|
doThrow(new YarnException("Forced Failure")).when(localizer).localizeFiles(
|
||||||
|
any(LocalizationProtocol.class), any(CompletionService.class),
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
verify(localizer, never()).closeFileSystems(
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
localizer.runLocalization(nmAddr);
|
||||||
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
|
private ContainerLocalizer setupContainerLocalizerForTest()
|
||||||
|
throws Exception {
|
||||||
|
spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
||||||
|
// don't actually create dirs
|
||||||
|
doNothing().when(spylfs).mkdir(
|
||||||
|
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileContext lfs = FileContext.getFileContext(spylfs, conf);
|
||||||
|
localDirs = new ArrayList<Path>();
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
|
||||||
|
}
|
||||||
|
RecordFactory mockRF = getMockLocalizerRecordFactory();
|
||||||
|
ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
|
||||||
|
appId, containerId, localDirs, mockRF);
|
||||||
|
ContainerLocalizer localizer = spy(concreteLoc);
|
||||||
|
|
||||||
|
// return credential stream instead of opening local file
|
||||||
|
random = new Random();
|
||||||
|
long seed = random.nextLong();
|
||||||
|
System.out.println("SEED: " + seed);
|
||||||
|
random.setSeed(seed);
|
||||||
|
DataInputBuffer appTokens = createFakeCredentials(random, 10);
|
||||||
|
tokenPath =
|
||||||
|
lfs.makeQualified(new Path(
|
||||||
|
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
||||||
|
containerId)));
|
||||||
|
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
||||||
|
).when(spylfs).open(tokenPath);
|
||||||
|
|
||||||
|
nmProxy = mock(LocalizationProtocol.class);
|
||||||
|
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
||||||
|
doNothing().when(localizer).sleep(anyInt());
|
||||||
|
|
||||||
|
// return result instantly for deterministic test
|
||||||
|
ExecutorService syncExec = mock(ExecutorService.class);
|
||||||
|
CompletionService<Path> cs = mock(CompletionService.class);
|
||||||
|
when(cs.submit(isA(Callable.class)))
|
||||||
|
.thenAnswer(new Answer<Future<Path>>() {
|
||||||
|
@Override
|
||||||
|
public Future<Path> answer(InvocationOnMock invoc)
|
||||||
|
throws Throwable {
|
||||||
|
Future<Path> done = mock(Future.class);
|
||||||
|
when(done.isDone()).thenReturn(true);
|
||||||
|
FakeDownload d = (FakeDownload) invoc.getArguments()[0];
|
||||||
|
when(done.get()).thenReturn(d.call());
|
||||||
|
return done;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
doReturn(syncExec).when(localizer).createDownloadThreadPool();
|
||||||
|
doReturn(cs).when(localizer).createCompletionService(syncExec);
|
||||||
|
|
||||||
|
return localizer;
|
||||||
|
}
|
||||||
|
|
||||||
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
|
static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
|
||||||
final LocalResource rsrc;
|
final LocalResource rsrc;
|
||||||
HBMatches(LocalResource rsrc) {
|
HBMatches(LocalResource rsrc) {
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -126,9 +127,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||||
|
|
||||||
LogAggregationService logAggregationService =
|
LogAggregationService logAggregationService = spy(
|
||||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
super.dirsHandler);
|
super.dirsHandler));
|
||||||
logAggregationService.init(this.conf);
|
logAggregationService.init(this.conf);
|
||||||
logAggregationService.start();
|
logAggregationService.start();
|
||||||
|
|
||||||
|
@ -156,7 +157,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
application1));
|
application1));
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
|
// ensure filesystems were closed
|
||||||
|
verify(logAggregationService).closeFileSystems(
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
|
||||||
String containerIdStr = ConverterUtils.toString(container11);
|
String containerIdStr = ConverterUtils.toString(container11);
|
||||||
File containerLogDir = new File(app1LogDir, containerIdStr);
|
File containerLogDir = new File(app1LogDir, containerIdStr);
|
||||||
|
@ -380,9 +383,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLogAggregationFailsWithoutKillingNM() throws Exception {
|
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
|
||||||
|
|
||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
|
||||||
|
localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
|
||||||
|
@ -403,6 +407,56 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
eq(appId), eq(user), any(Credentials.class),
|
eq(appId), eq(user), any(Credentials.class),
|
||||||
any(ContainerLogsRetentionPolicy.class), anyMap());
|
any(ContainerLogsRetentionPolicy.class), anyMap());
|
||||||
|
|
||||||
|
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
|
||||||
|
this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
|
||||||
|
this.acls));
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
||||||
|
new ApplicationFinishEvent(appId,
|
||||||
|
"Application failed to init aggregation: KABOOM!")
|
||||||
|
};
|
||||||
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
// no filesystems instantiated yet
|
||||||
|
verify(logAggregationService, never()).closeFileSystems(
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
|
||||||
|
// verify trying to collect logs for containers/apps we don't know about
|
||||||
|
// doesn't blow up and tear down the NM
|
||||||
|
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
|
||||||
|
BuilderUtils.newContainerId(4, 1, 1, 1), 0));
|
||||||
|
dispatcher.await();
|
||||||
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
|
BuilderUtils.newApplicationId(1, 5)));
|
||||||
|
dispatcher.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testLogAggregationCreateDirsFailsWithoutKillingNM()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = createDispatcher();
|
||||||
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||||
|
|
||||||
|
LogAggregationService logAggregationService = spy(
|
||||||
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
|
super.dirsHandler));
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
logAggregationService.start();
|
||||||
|
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(
|
||||||
|
System.currentTimeMillis(), (int)Math.random());
|
||||||
|
doThrow(new YarnException("KABOOM!"))
|
||||||
|
.when(logAggregationService).createAppDir(any(String.class),
|
||||||
|
any(ApplicationId.class), any(UserGroupInformation.class));
|
||||||
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
|
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
|
||||||
this.user, null,
|
this.user, null,
|
||||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||||
|
@ -413,6 +467,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
};
|
};
|
||||||
checkEvents(appEventHandler, expectedEvents, false,
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
"getType", "getApplicationID", "getDiagnostic");
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
// filesystems may have been instantiated
|
||||||
|
verify(logAggregationService).closeFileSystems(
|
||||||
|
any(UserGroupInformation.class));
|
||||||
|
|
||||||
// verify trying to collect logs for containers/apps we don't know about
|
// verify trying to collect logs for containers/apps we don't know about
|
||||||
// doesn't blow up and tear down the NM
|
// doesn't blow up and tear down the NM
|
||||||
|
|
Loading…
Reference in New Issue