merge YARN-71 from trunk. Fix the NodeManager to clean up local-dirs on restart. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1460809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
13017132fa
commit
a427b71a25
|
@ -70,6 +70,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-470. Support a way to disable resource monitoring on the NodeManager.
|
||||
(Siddharth Seth via hitesh)
|
||||
|
||||
YARN-71. Fix the NodeManager to clean up local-dirs on restart.
|
||||
(Xuan Gong via sseth)
|
||||
|
||||
Release 2.0.4-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.CompositeService;
|
|||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class NodeManager extends CompositeService
|
||||
implements EventHandler<NodeManagerEvent> {
|
||||
|
||||
|
@ -113,6 +115,10 @@ public class NodeManager extends CompositeService
|
|||
return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
|
||||
}
|
||||
|
||||
protected DeletionService createDeletionService(ContainerExecutor exec) {
|
||||
return new DeletionService(exec);
|
||||
}
|
||||
|
||||
protected void doSecureLogin() throws IOException {
|
||||
SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
|
||||
YarnConfiguration.NM_PRINCIPAL);
|
||||
|
@ -143,7 +149,7 @@ public class NodeManager extends CompositeService
|
|||
} catch (IOException e) {
|
||||
throw new YarnException("Failed to initialize container executor", e);
|
||||
}
|
||||
DeletionService del = new DeletionService(exec);
|
||||
DeletionService del = createDeletionService(exec);
|
||||
addService(del);
|
||||
|
||||
// NodeManager level dispatcher
|
||||
|
@ -351,6 +357,11 @@ public class NodeManager extends CompositeService
|
|||
return containerManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Context getNMContext() {
|
||||
return this.context;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
|||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -53,8 +54,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -175,9 +178,11 @@ public class ResourceLocalizationService extends CompositeService
|
|||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||
|
||||
try {
|
||||
// TODO queue deletions here, rather than NM init?
|
||||
FileContext lfs = getLocalFileContext(conf);
|
||||
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
|
||||
|
||||
cleanUpLocalDir(lfs,delService);
|
||||
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
for (String localDir : localDirs) {
|
||||
// $local/usercache
|
||||
|
@ -926,4 +931,76 @@ public class ResourceLocalizationService extends CompositeService
|
|||
|
||||
}
|
||||
|
||||
private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
|
||||
long currentTimeStamp = System.currentTimeMillis();
|
||||
for (String localDir : dirsHandler.getLocalDirs()) {
|
||||
renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
|
||||
currentTimeStamp);
|
||||
renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
|
||||
currentTimeStamp);
|
||||
renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
|
||||
currentTimeStamp);
|
||||
try {
|
||||
deleteLocalDir(lfs, del, localDir);
|
||||
} catch (IOException e) {
|
||||
// Do nothing, just give the warning
|
||||
LOG.warn("Failed to delete localDir: " + localDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void renameLocalDir(FileContext lfs, String localDir,
|
||||
String localSubDir, long currentTimeStamp) {
|
||||
try {
|
||||
lfs.rename(new Path(localDir, localSubDir), new Path(
|
||||
localDir, localSubDir + "_DEL_" + currentTimeStamp));
|
||||
} catch (FileNotFoundException ex) {
|
||||
// No need to handle this exception
|
||||
// localSubDir may not be exist
|
||||
} catch (Exception ex) {
|
||||
// Do nothing, just give the warning
|
||||
LOG.warn("Failed to rename the local file under " +
|
||||
localDir + "/" + localSubDir);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteLocalDir(FileContext lfs, DeletionService del,
|
||||
String localDir) throws IOException {
|
||||
RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
|
||||
if (fileStatus != null) {
|
||||
while (fileStatus.hasNext()) {
|
||||
FileStatus status = fileStatus.next();
|
||||
try {
|
||||
if (status.getPath().getName().matches(".*" +
|
||||
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
|
||||
cleanUpFilesFromSubDir(lfs, del, status.getPath());
|
||||
} else if (status.getPath().getName()
|
||||
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
|
||||
||
|
||||
status.getPath().getName()
|
||||
.matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
|
||||
del.delete(null, status.getPath(), new Path[] {});
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// Do nothing, just give the warning
|
||||
LOG.warn("Failed to delete this local Directory: " +
|
||||
status.getPath().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
|
||||
Path dirPath) throws IOException {
|
||||
RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
|
||||
if (fileStatus != null) {
|
||||
while (fileStatus.hasNext()) {
|
||||
FileStatus status = fileStatus.next();
|
||||
String owner = status.getOwner();
|
||||
del.delete(owner, status.getPath(), new Path[] {});
|
||||
}
|
||||
}
|
||||
del.delete(null, dirPath, new Path[] {});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,297 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
|
||||
public class TestNodeManagerReboot {
|
||||
|
||||
static final File basedir =
|
||||
new File("target", TestNodeManagerReboot.class.getName());
|
||||
static final File logsDir = new File(basedir, "logs");
|
||||
static final File nmLocalDir = new File(basedir, "nm0");
|
||||
static final File localResourceDir = new File(basedir, "resource");
|
||||
|
||||
static final String user = System.getProperty("user.name");
|
||||
private FileContext localFS;
|
||||
|
||||
private MyNodeManager nm;
|
||||
private DeletionService delService;
|
||||
static final Log LOG = LogFactory.getLog(TestNodeManagerReboot.class);
|
||||
|
||||
@Before
|
||||
public void setup() throws UnsupportedFileSystemException {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException, InterruptedException {
|
||||
localFS.delete(new Path(basedir.getPath()), true);
|
||||
if (nm != null) {
|
||||
nm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testClearLocalDirWhenNodeReboot() throws IOException {
|
||||
nm = new MyNodeManager();
|
||||
nm.start();
|
||||
// create files under fileCache
|
||||
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
|
||||
localResourceDir.mkdirs();
|
||||
ContainerManagerImpl containerManager = nm.getContainerManager();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
URL localResourceUri =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(localResourceDir.getAbsolutePath())));
|
||||
|
||||
LocalResource localResource =
|
||||
Records.newRecord(LocalResource.class);
|
||||
localResource.setResource(localResourceUri);
|
||||
localResource.setSize(-1);
|
||||
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
localResource.setType(LocalResourceType.FILE);
|
||||
localResource.setTimestamp(localResourceDir.lastModified());
|
||||
String destinationFile = "dest_file";
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, localResource);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
||||
List<String> commands = new ArrayList<String>();
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(Records
|
||||
.newRecord(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(1024);
|
||||
StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
GetContainerStatusRequest request =
|
||||
Records.newRecord(GetContainerStatusRequest.class);
|
||||
request.setContainerId(cId);
|
||||
Container container =
|
||||
nm.getNMContext().getContainers().get(request.getContainerId());
|
||||
|
||||
final int MAX_TRIES = 20;
|
||||
int numTries = 0;
|
||||
while (!container.getContainerState().equals(ContainerState.DONE)
|
||||
&& numTries <= MAX_TRIES) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ex) {
|
||||
// Do nothing
|
||||
}
|
||||
numTries++;
|
||||
}
|
||||
|
||||
Assert.assertEquals(ContainerState.DONE, container.getContainerState());
|
||||
|
||||
Assert.assertTrue(
|
||||
"The container should create a subDir named currentUser: " + user +
|
||||
"under localDir/usercache",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.USERCACHE) > 0);
|
||||
|
||||
Assert.assertTrue("There should be files or Dirs under nm_private when " +
|
||||
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
||||
|
||||
nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
|
||||
|
||||
numTries = 0;
|
||||
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
||||
.USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
|
||||
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
|
||||
> 0) && numTries < MAX_TRIES) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ex) {
|
||||
// Do nothing
|
||||
}
|
||||
numTries++;
|
||||
}
|
||||
|
||||
Assert.assertTrue("After NM reboots, all local files should be deleted",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
||||
.USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
|
||||
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
|
||||
== 0);
|
||||
verify(delService, times(1)).delete(eq(user),
|
||||
argThat(new PathInclude(user)));
|
||||
verify(delService, times(1)).delete(
|
||||
(String) isNull(),
|
||||
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||
+ "_DEL_")));
|
||||
verify(delService, times(1)).delete((String) isNull(),
|
||||
argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
|
||||
verify(delService, times(1)).delete((String) isNull(),
|
||||
argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
|
||||
|
||||
}
|
||||
|
||||
private int numOfLocalDirs(String localDir, String localSubDir) {
|
||||
File[] listOfFiles = new File(localDir, localSubDir).listFiles();
|
||||
if (listOfFiles == null) {
|
||||
return 0;
|
||||
} else {
|
||||
return listOfFiles.length;
|
||||
}
|
||||
}
|
||||
|
||||
private void createFiles(String dir, String subDir, int numOfFiles) {
|
||||
for (int i = 0; i < numOfFiles; i++) {
|
||||
File newFile = new File(dir + "/" + subDir, "file_" + (i + 1));
|
||||
try {
|
||||
newFile.createNewFile();
|
||||
} catch (IOException e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerId createContainerId() {
|
||||
ApplicationId appId = Records.newRecord(ApplicationId.class);
|
||||
appId.setClusterTimestamp(0);
|
||||
appId.setId(0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
Records.newRecord(ApplicationAttemptId.class);
|
||||
appAttemptId.setApplicationId(appId);
|
||||
appAttemptId.setAttemptId(1);
|
||||
ContainerId containerId =
|
||||
Records.newRecord(ContainerId.class);
|
||||
containerId.setApplicationAttemptId(appAttemptId);
|
||||
return containerId;
|
||||
}
|
||||
|
||||
private class MyNodeManager extends NodeManager {
|
||||
|
||||
public MyNodeManager() {
|
||||
super();
|
||||
this.init(createNMConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
|
||||
context, dispatcher, healthChecker, metrics);
|
||||
return myNodeStatusUpdater;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DeletionService createDeletionService(ContainerExecutor exec) {
|
||||
delService = spy(new DeletionService(exec));
|
||||
return delService;
|
||||
}
|
||||
|
||||
// mimic part of reboot process
|
||||
@Override
|
||||
public void handle(NodeManagerEvent event) {
|
||||
switch (event.getType()) {
|
||||
case SHUTDOWN:
|
||||
this.stop();
|
||||
break;
|
||||
case REBOOT:
|
||||
this.stop();
|
||||
this.createNewMyNodeManager().start();
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
private MyNodeManager createNewMyNodeManager() {
|
||||
return new MyNodeManager();
|
||||
}
|
||||
|
||||
private YarnConfiguration createNMConfig() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
|
||||
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
|
||||
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
class PathInclude extends ArgumentMatcher<Path> {
|
||||
|
||||
final String part;
|
||||
|
||||
PathInclude(String part) {
|
||||
this.part = part;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
return ((Path) o).getName().indexOf(part) != -1;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue