MAPREDUCE-6550. archive-logs tool changes log ownership to the Yarn user when using DefaultContainerExecutor (rkanter)

(cherry picked from commit 6d84cc16b3)
This commit is contained in:
Robert Kanter 2015-11-25 17:12:40 -08:00
parent 8c6273d117
commit 7b4bf23b2a
6 changed files with 139 additions and 20 deletions

View File

@ -361,6 +361,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
duplicate records (wilfreds via rkanter) duplicate records (wilfreds via rkanter)
MAPREDUCE-6550. archive-logs tool changes log ownership to the Yarn
user when using DefaultContainerExecutor (rkanter)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -77,6 +77,7 @@ public class HadoopArchiveLogs implements Tool {
private static final String MEMORY_OPTION = "memory"; private static final String MEMORY_OPTION = "memory";
private static final String VERBOSE_OPTION = "verbose"; private static final String VERBOSE_OPTION = "verbose";
private static final String FORCE_OPTION = "force"; private static final String FORCE_OPTION = "force";
private static final String NO_PROXY_OPTION = "noProxy";
private static final int DEFAULT_MAX_ELIGIBLE = -1; private static final int DEFAULT_MAX_ELIGIBLE = -1;
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20; private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
@ -94,6 +95,8 @@ public class HadoopArchiveLogs implements Tool {
private boolean verbose = false; private boolean verbose = false;
@VisibleForTesting @VisibleForTesting
boolean force = false; boolean force = false;
@VisibleForTesting
boolean proxy = true;
@VisibleForTesting @VisibleForTesting
Set<AppInfo> eligibleApplications; Set<AppInfo> eligibleApplications;
@ -208,6 +211,12 @@ private void handleOpts(String[] args) throws ParseException {
"Force recreating the working directory if an existing one is found. " + "Force recreating the working directory if an existing one is found. " +
"This should only be used if you know that another instance is " + "This should only be used if you know that another instance is " +
"not currently running"); "not currently running");
Option noProxyOpt = new Option(NO_PROXY_OPTION, false,
"When specified, all processing will be done as the user running this" +
" command (or the Yarn user if DefaultContainerExecutor is in " +
"use). When not specified, all processing will be done as the " +
"user who owns that application; if the user running this command" +
" is not allowed to impersonate that user, it will fail");
opts.addOption(helpOpt); opts.addOption(helpOpt);
opts.addOption(maxEligibleOpt); opts.addOption(maxEligibleOpt);
opts.addOption(minNumLogFilesOpt); opts.addOption(minNumLogFilesOpt);
@ -215,6 +224,7 @@ private void handleOpts(String[] args) throws ParseException {
opts.addOption(memoryOpt); opts.addOption(memoryOpt);
opts.addOption(verboseOpt); opts.addOption(verboseOpt);
opts.addOption(forceOpt); opts.addOption(forceOpt);
opts.addOption(noProxyOpt);
try { try {
CommandLineParser parser = new GnuParser(); CommandLineParser parser = new GnuParser();
@ -252,6 +262,9 @@ private void handleOpts(String[] args) throws ParseException {
if (commandLine.hasOption(FORCE_OPTION)) { if (commandLine.hasOption(FORCE_OPTION)) {
force = true; force = true;
} }
if (commandLine.hasOption(NO_PROXY_OPTION)) {
proxy = false;
}
} catch (ParseException pe) { } catch (ParseException pe) {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("mapred archive-logs", opts); formatter.printHelp("mapred archive-logs", opts);
@ -274,7 +287,7 @@ boolean prepareWorkingDir(FileSystem fs, Path workingDir) throws IOException {
} }
fs.mkdirs(workingDir); fs.mkdirs(workingDir);
fs.setPermission(workingDir, fs.setPermission(workingDir,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE)); new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true));
return true; return true;
} }
@ -479,6 +492,9 @@ void generateScript(File localScript, Path workingDir,
fw.write(remoteRootLogDir.toString()); fw.write(remoteRootLogDir.toString());
fw.write(" -suffix "); fw.write(" -suffix ");
fw.write(suffix); fw.write(suffix);
if (!proxy) {
fw.write(" -noProxy\n");
}
fw.write("\n"); fw.write("\n");
} finally { } finally {
if (fw != null) { if (fw != null) {

View File

@ -31,33 +31,45 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import java.io.File; import java.io.File;
import java.security.PrivilegedExceptionAction;
/** /**
* This is a child program designed to be used by the {@link HadoopArchiveLogs} * This is a child program designed to be used by the {@link HadoopArchiveLogs}
* tool via the Distributed Shell. It's not meant to be run directly. * tool via the Distributed Shell. It's not meant to be run directly.
*/ */
public class HadoopArchiveLogsRunner implements Tool { public class HadoopArchiveLogsRunner implements Tool {
private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class); private static final Log LOG =
LogFactory.getLog(HadoopArchiveLogsRunner.class);
private static final String APP_ID_OPTION = "appId"; private static final String APP_ID_OPTION = "appId";
private static final String USER_OPTION = "user"; private static final String USER_OPTION = "user";
private static final String WORKING_DIR_OPTION = "workingDir"; private static final String WORKING_DIR_OPTION = "workingDir";
private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir"; private static final String REMOTE_ROOT_LOG_DIR_OPTION = "remoteRootLogDir";
private static final String SUFFIX_OPTION = "suffix"; private static final String SUFFIX_OPTION = "suffix";
private static final String NO_PROXY_OPTION = "noProxy";
private String appId; private String appId;
private String user; private String user;
private String workingDir; private String workingDir;
private String remoteLogDir; private String remoteLogDir;
private String suffix; private String suffix;
private boolean proxy;
private JobConf conf; private JobConf conf;
private static final FsPermission HAR_DIR_PERM =
new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE);
private static final FsPermission HAR_INNER_FILES_PERM =
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE);
public HadoopArchiveLogsRunner(Configuration conf) { public HadoopArchiveLogsRunner(Configuration conf) {
setConf(conf); setConf(conf);
} }
@ -87,13 +99,40 @@ public static void main(String[] args) {
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
handleOpts(args); handleOpts(args);
Integer exitCode = 1;
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// If we're running as the user, then no need to impersonate
// (which might fail if user is not a proxyuser for themselves)
// Also if !proxy is set
if (!proxy || loginUser.getShortUserName().equals(user)) {
LOG.info("Running as " + user);
exitCode = runInternal();
} else {
// Otherwise impersonate user. If we're not allowed to, then this will
// fail with an Exception
LOG.info("Running as " + loginUser.getShortUserName() + " but will " +
"impersonate " + user);
UserGroupInformation proxyUser =
UserGroupInformation.createProxyUser(user, loginUser);
exitCode = proxyUser.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws Exception {
return runInternal();
}
});
}
return exitCode;
}
private int runInternal() throws Exception {
String remoteAppLogDir = remoteLogDir + File.separator + user String remoteAppLogDir = remoteLogDir + File.separator + user
+ File.separator + suffix + File.separator + appId; + File.separator + suffix + File.separator + appId;
// Run 'hadoop archives' command in local mode // Run 'hadoop archives' command in local mode
Configuration haConf = new Configuration(getConf()); conf.set("mapreduce.framework.name", "local");
haConf.set("mapreduce.framework.name", "local"); // Set the umask so we get 640 files and 750 dirs
HadoopArchives ha = new HadoopArchives(haConf); conf.set("fs.permissions.umask-mode", "027");
HadoopArchives ha = new HadoopArchives(conf);
String[] haArgs = { String[] haArgs = {
"-archiveName", "-archiveName",
appId + ".har", appId + ".har",
@ -113,9 +152,9 @@ public int run(String[] args) throws Exception {
// Move har file to correct location and delete original logs // Move har file to correct location and delete original logs
try { try {
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
Path harDest = new Path(remoteAppLogDir, appId + ".har");
LOG.info("Moving har to original location"); LOG.info("Moving har to original location");
fs.rename(new Path(workingDir, appId + ".har"), fs.rename(new Path(workingDir, appId + ".har"), harDest);
new Path(remoteAppLogDir, appId + ".har"));
LOG.info("Deleting original logs"); LOG.info("Deleting original logs");
for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir), for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
new PathFilter() { new PathFilter() {
@ -131,7 +170,6 @@ public boolean accept(Path path) {
fs.close(); fs.close();
} }
} }
return 0; return 0;
} }
@ -144,24 +182,30 @@ private void handleOpts(String[] args) throws ParseException {
Option workingDirOpt = new Option(WORKING_DIR_OPTION, true, Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
"Working Directory"); "Working Directory");
workingDirOpt.setRequired(true); workingDirOpt.setRequired(true);
Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true, Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR_OPTION, true,
"Remote Root Log Directory"); "Remote Root Log Directory");
remoteLogDirOpt.setRequired(true); remoteLogDirOpt.setRequired(true);
Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix"); Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
suffixOpt.setRequired(true); suffixOpt.setRequired(true);
Option useProxyOpt = new Option(NO_PROXY_OPTION, false, "Use Proxy");
opts.addOption(appIdOpt); opts.addOption(appIdOpt);
opts.addOption(userOpt); opts.addOption(userOpt);
opts.addOption(workingDirOpt); opts.addOption(workingDirOpt);
opts.addOption(remoteLogDirOpt); opts.addOption(remoteLogDirOpt);
opts.addOption(suffixOpt); opts.addOption(suffixOpt);
opts.addOption(useProxyOpt);
CommandLineParser parser = new GnuParser(); CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args); CommandLine commandLine = parser.parse(opts, args);
appId = commandLine.getOptionValue(APP_ID_OPTION); appId = commandLine.getOptionValue(APP_ID_OPTION);
user = commandLine.getOptionValue(USER_OPTION); user = commandLine.getOptionValue(USER_OPTION);
workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION); workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR); remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR_OPTION);
suffix = commandLine.getOptionValue(SUFFIX_OPTION); suffix = commandLine.getOptionValue(SUFFIX_OPTION);
proxy = true;
if (commandLine.hasOption(NO_PROXY_OPTION)) {
proxy = false;
}
} }
@Override @Override

View File

@ -48,6 +48,14 @@ How to Archive Logs
each container (default: 1024) each container (default: 1024)
-minNumberLogFiles <n> The minimum number of log files required -minNumberLogFiles <n> The minimum number of log files required
to be eligible (default: 20) to be eligible (default: 20)
-noProxy When specified, all processing will be
done as the user running this command (or
the Yarn user if DefaultContainerExecutor
is in use). When not specified, all
processing will be done as the user who
owns that application; if the user
running this command is not allowed to
impersonate that user, it will fail
-verbose Print more details. -verbose Print more details.
The tool only supports running one instance on a cluster at a time in order The tool only supports running one instance on a cluster at a time in order
@ -77,6 +85,15 @@ The tool works by performing the following procedure:
the ``hadoop archives`` command for a single application and replaces the ``hadoop archives`` command for a single application and replaces
its aggregated log files with the resulting archive. its aggregated log files with the resulting archive.
The ``-noProxy`` option makes the tool process everything as the user who is
currently running it, or the Yarn user if DefaultContainerExecutor is in use.
When not specified, all processing will be done by the user who owns that
application; if the user running this command is not allowed to impersonate that
user, it will fail. This is useful if you want an admin user to handle all
aggregation without enabling impersonation. With ``-noProxy`` the resulting
HAR files will be owned by whoever ran the tool, instead of whoever originally
owned the logs.
The ``-verbose`` option makes the tool print more details about what it's The ``-verbose`` option makes the tool print more details about what it's
doing. doing.

View File

@ -163,7 +163,7 @@ public void testCheckMaxEligible() throws Exception {
Assert.assertTrue(hal.eligibleApplications.contains(app3)); Assert.assertTrue(hal.eligibleApplications.contains(app3));
} }
@Test(timeout = 10000) @Test(timeout = 30000)
public void testFilterAppsByAggregatedStatus() throws Exception { public void testFilterAppsByAggregatedStatus() throws Exception {
MiniYARNCluster yarnCluster = null; MiniYARNCluster yarnCluster = null;
try { try {
@ -246,6 +246,11 @@ public void testFilterAppsByAggregatedStatus() throws Exception {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testGenerateScript() throws Exception { public void testGenerateScript() throws Exception {
_testGenerateScript(false);
_testGenerateScript(true);
}
private void _testGenerateScript(boolean proxy) throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1); ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
@ -254,6 +259,7 @@ public void testGenerateScript() throws Exception {
USER)); USER));
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(), hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
USER)); USER));
hal.proxy = proxy;
File localScript = new File("target", "script.sh"); File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working"); Path workingDir = new Path("/tmp", "working");
@ -286,10 +292,21 @@ public void testGenerateScript() throws Exception {
Assert.assertEquals("fi", lines[12]); Assert.assertEquals("fi", lines[12]);
Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]); Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH=")); Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
Assert.assertEquals("\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." + if (proxy) {
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir " Assert.assertEquals(
+ workingDir.toString() + " -remoteRootLogDir " + "\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]); "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
remoteRootLogDir.toString() + " -suffix " + suffix,
lines[15]);
} else {
Assert.assertEquals(
"\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
remoteRootLogDir.toString() + " -suffix " + suffix + " -noProxy",
lines[15]);
}
} }
/** /**
@ -325,7 +342,7 @@ public void testPrepareWorkingDir() throws Exception {
Assert.assertTrue(dirPrepared); Assert.assertTrue(dirPrepared);
Assert.assertTrue(fs.exists(workingDir)); Assert.assertTrue(fs.exists(workingDir));
Assert.assertEquals( Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission()); fs.getFileStatus(workingDir).getPermission());
// Throw a file in the dir // Throw a file in the dir
Path dummyFile = new Path(workingDir, "dummy.txt"); Path dummyFile = new Path(workingDir, "dummy.txt");
@ -337,6 +354,9 @@ public void testPrepareWorkingDir() throws Exception {
Assert.assertFalse(dirPrepared); Assert.assertFalse(dirPrepared);
Assert.assertTrue(fs.exists(workingDir)); Assert.assertTrue(fs.exists(workingDir));
Assert.assertTrue(fs.exists(dummyFile)); Assert.assertTrue(fs.exists(dummyFile));
Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission());
// -force is true and the dir exists, so it will recreate it and the dummy // -force is true and the dir exists, so it will recreate it and the dummy
// won't exist anymore // won't exist anymore
hal.force = true; hal.force = true;
@ -344,7 +364,7 @@ public void testPrepareWorkingDir() throws Exception {
Assert.assertTrue(dirPrepared); Assert.assertTrue(dirPrepared);
Assert.assertTrue(fs.exists(workingDir)); Assert.assertTrue(fs.exists(workingDir));
Assert.assertEquals( Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission()); fs.getFileStatus(workingDir).getPermission());
Assert.assertFalse(fs.exists(dummyFile)); Assert.assertFalse(fs.exists(dummyFile));
} }

View File

@ -24,7 +24,10 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -47,7 +50,7 @@ public class TestHadoopArchiveLogsRunner {
new Random().nextBytes(DUMMY_DATA); new Random().nextBytes(DUMMY_DATA);
} }
@Test(timeout = 30000) @Test(timeout = 50000)
public void testHadoopArchiveLogs() throws Exception { public void testHadoopArchiveLogs() throws Exception {
MiniYARNCluster yarnCluster = null; MiniYARNCluster yarnCluster = null;
MiniDFSCluster dfsCluster = null; MiniDFSCluster dfsCluster = null;
@ -63,6 +66,7 @@ public void testHadoopArchiveLogs() throws Exception {
yarnCluster.start(); yarnCluster.start();
conf = yarnCluster.getConfig(); conf = yarnCluster.getConfig();
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
conf = new JobConf(conf);
ApplicationId app1 = ApplicationId app1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
@ -108,10 +112,25 @@ public int compare(FileStatus o1, FileStatus o2) {
}); });
Assert.assertEquals("log1", harLogs[0].getPath().getName()); Assert.assertEquals("log1", harLogs[0].getPath().getName());
Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen()); Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
Assert.assertEquals(
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
harLogs[0].getPermission());
Assert.assertEquals(System.getProperty("user.name"),
harLogs[0].getOwner());
Assert.assertEquals("log2", harLogs[1].getPath().getName()); Assert.assertEquals("log2", harLogs[1].getPath().getName());
Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen()); Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
Assert.assertEquals(
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
harLogs[1].getPermission());
Assert.assertEquals(System.getProperty("user.name"),
harLogs[1].getOwner());
Assert.assertEquals("log3", harLogs[2].getPath().getName()); Assert.assertEquals("log3", harLogs[2].getPath().getName());
Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen()); Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
Assert.assertEquals(
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
harLogs[2].getPermission());
Assert.assertEquals(System.getProperty("user.name"),
harLogs[2].getOwner());
Assert.assertEquals(0, fs.listStatus(workingDir).length); Assert.assertEquals(0, fs.listStatus(workingDir).length);
} finally { } finally {
if (yarnCluster != null) { if (yarnCluster != null) {