Merging from trunk to branch-2 to fix YARN-742.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1489601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27ed92a3fb
commit
a912bcad0c
|
@ -437,6 +437,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||||
YARN-757. Changed TestRMRestart to use the default scheduler to avoid test
|
YARN-757. Changed TestRMRestart to use the default scheduler to avoid test
|
||||||
failures. (Bikas Saha via vinodkv)
|
failures. (Bikas Saha via vinodkv)
|
||||||
|
|
||||||
|
YARN-742. Log aggregation causes a lot of redundant setPermission calls.
|
||||||
|
(jlowe via kihwal)
|
||||||
|
|
||||||
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
YARN-158. Yarn creating package-info.java must not depend on sh.
|
YARN-158. Yarn creating package-info.java must not depend on sh.
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -163,11 +164,15 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected FileSystem getFileSystem(Configuration conf) throws IOException {
|
||||||
|
return FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
void verifyAndCreateRemoteLogDir(Configuration conf) {
|
void verifyAndCreateRemoteLogDir(Configuration conf) {
|
||||||
// Checking the existence of the TLD
|
// Checking the existence of the TLD
|
||||||
FileSystem remoteFS = null;
|
FileSystem remoteFS = null;
|
||||||
try {
|
try {
|
||||||
remoteFS = FileSystem.get(conf);
|
remoteFS = getFileSystem(conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
|
throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
|
||||||
}
|
}
|
||||||
|
@ -212,9 +217,27 @@ public class LogAggregationService extends AbstractService implements
|
||||||
|
|
||||||
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
|
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
fs.mkdirs(path, new FsPermission(fsPerm));
|
FsPermission dirPerm = new FsPermission(fsPerm);
|
||||||
|
fs.mkdirs(path, dirPerm);
|
||||||
|
FsPermission umask = FsPermission.getUMask(fs.getConf());
|
||||||
|
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
|
||||||
fs.setPermission(path, new FsPermission(fsPerm));
|
fs.setPermission(path, new FsPermission(fsPerm));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
|
||||||
|
throws IOException {
|
||||||
|
boolean exists = true;
|
||||||
|
try {
|
||||||
|
FileStatus appDirStatus = fs.getFileStatus(path);
|
||||||
|
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
|
||||||
|
fs.setPermission(path, APP_DIR_PERMISSIONS);
|
||||||
|
}
|
||||||
|
} catch (FileNotFoundException fnfe) {
|
||||||
|
exists = false;
|
||||||
|
}
|
||||||
|
return exists;
|
||||||
|
}
|
||||||
|
|
||||||
protected void createAppDir(final String user, final ApplicationId appId,
|
protected void createAppDir(final String user, final ApplicationId appId,
|
||||||
UserGroupInformation userUgi) {
|
UserGroupInformation userUgi) {
|
||||||
|
@ -222,57 +245,43 @@ public class LogAggregationService extends AbstractService implements
|
||||||
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
|
try {
|
||||||
// TODO: Reuse FS for user?
|
// TODO: Reuse FS for user?
|
||||||
FileSystem remoteFS = null;
|
FileSystem remoteFS = getFileSystem(getConfig());
|
||||||
Path userDir = null;
|
|
||||||
Path suffixDir = null;
|
// Only creating directories if they are missing to avoid
|
||||||
Path appDir = null;
|
// unnecessary load on the filesystem from all of the nodes
|
||||||
try {
|
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||||
remoteFS = FileSystem.get(getConfig());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed to get remote FileSystem while processing app "
|
|
||||||
+ appId, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
userDir =
|
|
||||||
LogAggregationUtils.getRemoteLogUserDir(
|
|
||||||
LogAggregationService.this.remoteRootLogDir, user);
|
|
||||||
userDir =
|
|
||||||
userDir.makeQualified(remoteFS.getUri(),
|
|
||||||
remoteFS.getWorkingDirectory());
|
|
||||||
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed to create user dir [" + userDir
|
|
||||||
+ "] while processing app " + appId);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
suffixDir =
|
|
||||||
LogAggregationUtils.getRemoteLogSuffixedDir(
|
|
||||||
LogAggregationService.this.remoteRootLogDir, user,
|
|
||||||
LogAggregationService.this.remoteRootLogDirSuffix);
|
|
||||||
suffixDir =
|
|
||||||
suffixDir.makeQualified(remoteFS.getUri(),
|
|
||||||
remoteFS.getWorkingDirectory());
|
|
||||||
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed to create suffixed user dir [" + suffixDir
|
|
||||||
+ "] while processing app " + appId);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
appDir =
|
|
||||||
LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
LogAggregationService.this.remoteRootLogDir, appId, user,
|
LogAggregationService.this.remoteRootLogDir, appId, user,
|
||||||
LogAggregationService.this.remoteRootLogDirSuffix);
|
LogAggregationService.this.remoteRootLogDirSuffix);
|
||||||
appDir =
|
appDir = appDir.makeQualified(remoteFS.getUri(),
|
||||||
appDir.makeQualified(remoteFS.getUri(),
|
|
||||||
remoteFS.getWorkingDirectory());
|
remoteFS.getWorkingDirectory());
|
||||||
|
|
||||||
|
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
|
||||||
|
Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
|
||||||
|
LogAggregationService.this.remoteRootLogDir, user,
|
||||||
|
LogAggregationService.this.remoteRootLogDirSuffix);
|
||||||
|
suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory());
|
||||||
|
|
||||||
|
if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
|
||||||
|
Path userDir = LogAggregationUtils.getRemoteLogUserDir(
|
||||||
|
LogAggregationService.this.remoteRootLogDir, user);
|
||||||
|
userDir = userDir.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory());
|
||||||
|
|
||||||
|
if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
|
||||||
|
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
|
||||||
|
}
|
||||||
|
|
||||||
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
|
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to create application log dir [" + appDir
|
LOG.error("Failed to setup application log directory for "
|
||||||
+ "] while processing app " + appId);
|
+ appId, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -294,7 +303,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
eventResponse = new ApplicationEvent(appId,
|
eventResponse = new ApplicationEvent(appId,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
|
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
|
||||||
} catch (YarnRuntimeException e) {
|
} catch (YarnRuntimeException e) {
|
||||||
LOG.warn("Application failed to init aggregation: " + e.getMessage());
|
LOG.warn("Application failed to init aggregation", e);
|
||||||
eventResponse = new ApplicationEvent(appId,
|
eventResponse = new ApplicationEvent(appId,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.atLeast;
|
import static org.mockito.Mockito.atLeast;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
|
@ -55,8 +56,10 @@ import junit.framework.Assert;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnRuntimeException;
|
import org.apache.hadoop.yarn.YarnRuntimeException;
|
||||||
|
@ -78,6 +81,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
@ -507,6 +511,62 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
aNewFile.delete(); //housekeeping
|
aNewFile.delete(); //housekeeping
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppLogDirCreation() throws Exception {
|
||||||
|
final String logSuffix = "logs";
|
||||||
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
|
||||||
|
localLogDir.getAbsolutePath());
|
||||||
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
|
||||||
|
|
||||||
|
InlineDispatcher dispatcher = new InlineDispatcher();
|
||||||
|
dispatcher.init(this.conf);
|
||||||
|
dispatcher.start();
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(this.conf);
|
||||||
|
final FileSystem spyFs = spy(FileSystem.get(this.conf));
|
||||||
|
|
||||||
|
LogAggregationService aggSvc = new LogAggregationService(dispatcher,
|
||||||
|
this.context, this.delSrvc, super.dirsHandler) {
|
||||||
|
@Override
|
||||||
|
protected FileSystem getFileSystem(Configuration conf) {
|
||||||
|
return spyFs;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
aggSvc.init(this.conf);
|
||||||
|
aggSvc.start();
|
||||||
|
|
||||||
|
// start an application and verify user, suffix, and app dirs created
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||||
|
Path userDir = fs.makeQualified(new Path(
|
||||||
|
remoteRootLogDir.getAbsolutePath(), this.user));
|
||||||
|
Path suffixDir = new Path(userDir, logSuffix);
|
||||||
|
Path appDir = new Path(suffixDir, appId.toString());
|
||||||
|
aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
|
||||||
|
verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
|
||||||
|
verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
|
||||||
|
|
||||||
|
// start another application and verify only app dir created
|
||||||
|
ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
|
||||||
|
Path appDir2 = new Path(suffixDir, appId2.toString());
|
||||||
|
aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
|
||||||
|
|
||||||
|
// start another application with the app dir already created and verify
|
||||||
|
// we do not try to create it again
|
||||||
|
ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
|
||||||
|
Path appDir3 = new Path(suffixDir, appId3.toString());
|
||||||
|
new File(appDir3.toUri().getPath()).mkdir();
|
||||||
|
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
||||||
|
verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
|
public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue