MAPREDUCE-6741. Add MR support to redact job conf properties. Contributed by Haibo Chen

(cherry picked from commit f1b74a3d9f)

Conflicts:

	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
This commit is contained in:
Jason Lowe 2016-10-05 15:33:23 +00:00
parent 5c99959b2d
commit 599146d10b
8 changed files with 151 additions and 13 deletions

View File

@ -8,6 +8,9 @@ Release 2.7.4 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-6741. Add MR support to redact job conf properties. (Haibo Chen
via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -469,16 +470,16 @@ public class JobHistoryEventHandler extends AbstractService
if (conf != null) { if (conf != null) {
// TODO Ideally this should be written out to the job dir // TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched) // (.staging/jobid/files - RecoveryService will need to be patched)
FSDataOutputStream jobFileOut = null; if (logDirConfPath != null) {
try { Configuration redactedConf = new Configuration(conf);
if (logDirConfPath != null) { MRJobConfUtil.redact(redactedConf);
jobFileOut = stagingDirFS.create(logDirConfPath, true); try (FSDataOutputStream jobFileOut = stagingDirFS
conf.writeXml(jobFileOut); .create(logDirConfPath, true)) {
jobFileOut.close(); redactedConf.writeXml(jobFileOut);
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
} }
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
} }
} }
} }

View File

@ -26,8 +26,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@XmlRootElement(name = "conf") @XmlRootElement(name = "conf")
@ -45,6 +44,7 @@ public class ConfInfo {
this.property = new ArrayList<ConfEntryInfo>(); this.property = new ArrayList<ConfEntryInfo>();
Configuration jobConf = job.loadConfFile(); Configuration jobConf = job.loadConfFile();
this.path = job.getConfFile().toString(); this.path = job.getConfFile().toString();
MRJobConfUtil.redact(jobConf);
for (Map.Entry<String, String> entry : jobConf) { for (Map.Entry<String, String> entry : jobConf) {
this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(), this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(),
jobConf.getPropertySources(entry.getKey()))); jobConf.getPropertySources(entry.getKey())));

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -370,6 +372,74 @@ public class TestJobHistoryEventHandler {
} }
} }
@Test
public void testPropertyRedactionForJHS() throws Exception {
final Configuration conf = new Configuration();
String sensitivePropertyName = "aws.fake.credentials.name";
String sensitivePropertyValue = "aws.fake.credentials.val";
conf.set(sensitivePropertyName, sensitivePropertyValue);
conf.set(MRJobConfig.MR_JOB_REDACTED_PROPERTIES,
sensitivePropertyName);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
dfsCluster.getURI().toString());
final TestParams params = new TestParams();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, params.dfsWorkDir);
final JHEvenHandlerForTest jheh =
new JHEvenHandlerForTest(params.mockAppContext, 0, false);
try {
jheh.init(conf);
jheh.start();
handleEvent(jheh, new JobHistoryEvent(params.jobId,
new AMStartedEvent(params.appAttemptId, 200, params.containerId,
"nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(params.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(
params.jobId), 0, 0, 0, JobStateInternal.FAILED.toString())));
// verify the value of the sensitive property in job.xml is restored.
Assert.assertEquals(sensitivePropertyName + " is modified.",
conf.get(sensitivePropertyName), sensitivePropertyValue);
// load the job_conf.xml in JHS directory and verify property redaction.
Path jhsJobConfFile = getJobConfInIntermediateDoneDir(conf, params.jobId);
Assert.assertTrue("The job_conf.xml file is not in the JHS directory",
FileContext.getFileContext(conf).util().exists(jhsJobConfFile));
Configuration jhsJobConf = new Configuration();
try (InputStream input = FileSystem.get(conf).open(jhsJobConfFile)) {
jhsJobConf.addResource(input);
Assert.assertEquals(
sensitivePropertyName + " is not redacted in HDFS.",
MRJobConfUtil.REDACTION_REPLACEMENT_VAL,
jhsJobConf.get(sensitivePropertyName));
}
} finally {
jheh.stop();
purgeHdfsHistoryIntermediateDoneDirectory(conf);
}
}
private static Path getJobConfInIntermediateDoneDir(Configuration conf,
JobId jobId) throws IOException {
Path userDoneDir = new Path(
JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf));
Path doneDirPrefix =
FileContext.getFileContext(conf).makeQualified(userDoneDir);
return new Path(
doneDirPrefix, JobHistoryUtils.getIntermediateConfFileName(jobId));
}
private void purgeHdfsHistoryIntermediateDoneDirectory(Configuration conf)
throws IOException {
FileSystem fs = FileSystem.get(dfsCluster.getConfiguration(0));
String intermDoneDirPrefix =
JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
fs.delete(new Path(intermDoneDirPrefix), true);
}
@Test (timeout=50000) @Test (timeout=50000)
public void testDefaultFsIsUsedForHistory() throws Exception { public void testDefaultFsIsUsedForHistory() throws Exception {
// Create default configuration pointing to the minicluster // Create default configuration pointing to the minicluster
@ -411,6 +481,7 @@ public class TestJobHistoryEventHandler {
localFileSystem.exists(new Path(t.dfsWorkDir))); localFileSystem.exists(new Path(t.dfsWorkDir)));
} finally { } finally {
jheh.stop(); jheh.stop();
purgeHdfsHistoryIntermediateDoneDirectory(conf);
} }
} }

View File

@ -23,6 +23,8 @@ import java.io.PrintWriter;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -59,6 +61,9 @@ public class TestBlocks {
Path path = new Path("conf"); Path path = new Path("conf");
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
configuration.set("Key for test", "Value for test"); configuration.set("Key for test", "Value for test");
final String redactedProp = "Key for redaction";
configuration.set(MRJobConfig.MR_JOB_REDACTED_PROPERTIES,
redactedProp);
when(job.getConfFile()).thenReturn(path); when(job.getConfFile()).thenReturn(path);
when(job.loadConfFile()).thenReturn(configuration); when(job.loadConfFile()).thenReturn(configuration);
@ -79,9 +84,10 @@ public class TestBlocks {
configurationBlock.render(html); configurationBlock.render(html);
pWriter.flush(); pWriter.flush();
assertTrue(data.toString().contains("Key for test")); assertTrue(data.toString().contains("Key for test"));
assertTrue(data.toString().contains("Value for test")); assertTrue(data.toString().contains("Value for test"));
assertTrue(data.toString().contains(redactedProp));
assertTrue(data.toString().contains(
MRJobConfUtil.REDACTION_REPLACEMENT_VAL));
} }
/** /**

View File

@ -896,4 +896,8 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
128; 128;
/**
* A comma-separated list of properties whose value will be redacted.
*/
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* A class that contains utility methods for MR Job configuration.
*/
public final class MRJobConfUtil {
public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
/**
* Redact job configuration properties.
* @param conf the job configuration to redact
*/
public static void redact(final Configuration conf) {
for (String prop : conf.getTrimmedStringCollection(
MRJobConfig.MR_JOB_REDACTED_PROPERTIES)) {
conf.set(prop, REDACTION_REPLACEMENT_VAL);
}
}
/**
* There is no reason to instantiate this utility class.
*/
private MRJobConfUtil() {
}
}

View File

@ -2167,4 +2167,12 @@
app master. app master.
</description> </description>
</property> </property>
<property>
<description>
The list of job configuration properties whose value will be redacted.
</description>
<name>mapreduce.job.redacted-properties</name>
<value></value>
</property>
</configuration> </configuration>