diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
index d4b1b92946e..5a2091304f2 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
@@ -165,10 +165,11 @@ else if ("file".equals(url.getProtocol())) {
if (!testDir.exists()) {
testDir.mkdirs();
}
- File tempJar = File.createTempFile("hadoop-", "", testDir);
- tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+ File tempFile = File.createTempFile("hadoop-", "", testDir);
+ File tempJar = new File(tempFile.getAbsolutePath() + ".jar");
createJar(baseDir, tempJar);
tempJar.deleteOnExit();
+ tempFile.deleteOnExit();
return tempJar.getAbsolutePath();
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 51959b0fbef..ac585bffbdf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -104,6 +105,8 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -111,15 +114,24 @@
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
- private static Path outputDir = new Path(new File("target",
- TestRecovery.class.getName()).getAbsolutePath() +
- Path.SEPARATOR + "out");
+
+ private static File testRootDir;
+ private static Path outputDir;
private static String partFile = "part-r-00000";
private Text key1 = new Text("key1");
private Text key2 = new Text("key2");
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ // setup the test root directory
+ testRootDir =
+ GenericTestUtils.setupTestRootDir(
+ TestRecovery.class);
+ outputDir = new Path(testRootDir.getAbsolutePath(), "out");
+ }
+
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
@@ -598,8 +610,8 @@ public void testRecoveryWithSpillEncryption() throws Exception {
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
true, ++runCount) {
};
- Configuration conf = new Configuration();
- conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ Configuration conf =
+ MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
index e79ec664a56..4d2e8909769 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
@@ -44,7 +44,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
/**
* BackupStore
is an utility class that is used to support
@@ -575,7 +575,7 @@ private Writer createSpillFile() throws IOException {
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
-1, conf);
FSDataOutputStream out = fs.create(file);
- out = CryptoUtils.wrapIfNecessary(conf, out);
+ out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp);
return new Writer(conf, out, null, null, null, null, true);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 306c728b65e..f7b432c8879 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -64,6 +64,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.CryptoUtils;
@@ -1615,7 +1616,9 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
- partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
+ partitionOut =
+ IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
+ filename);
writer = new Writer(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
@@ -1672,6 +1675,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1712,7 +1716,9 @@ private void spillSingleRecord(final K key, final V value,
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
+ partitionOut =
+ IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
+ filename);
writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
@@ -1746,6 +1752,7 @@ private void spillSingleRecord(final K key, final V value,
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1839,15 +1846,19 @@ private void mergeParts() throws IOException, InterruptedException,
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
+ Path indexFileOutput =
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
if (indexCacheList.size() == 0) {
- sameVolRename(mapOutputFile.getSpillIndexFile(0),
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+ Path indexFilePath = mapOutputFile.getSpillIndexFile(0);
+ IntermediateEncryptedStream.validateSpillIndexFile(
+ indexFilePath, job);
+ sameVolRename(indexFilePath, indexFileOutput);
} else {
- indexCacheList.get(0).writeToFile(
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+ indexCacheList.get(0).writeToFile(indexFileOutput, job);
}
+ IntermediateEncryptedStream.addSpillIndexFile(indexFileOutput, job);
sortPhase.complete();
return;
}
@@ -1855,6 +1866,7 @@ private void mergeParts() throws IOException, InterruptedException,
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+ IntermediateEncryptedStream.validateSpillIndexFile(indexFileName, job);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1866,7 +1878,7 @@ private void mergeParts() throws IOException, InterruptedException,
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
-
+ IntermediateEncryptedStream.addSpillIndexFile(finalIndexFile, job);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
FSDataOutputStream finalPartitionOut = null;
@@ -1878,8 +1890,9 @@ private void mergeParts() throws IOException, InterruptedException,
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
- finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
- false);
+ finalPartitionOut =
+ IntermediateEncryptedStream.wrapIfNecessary(job, finalOut,
+ false, finalOutputFile);
Writer writer =
new Writer(job, finalPartitionOut, keyClass, valClass, codec, null);
writer.close();
@@ -1942,7 +1955,8 @@ private void mergeParts() throws IOException, InterruptedException,
//write merged output to disk
long segmentStart = finalOut.getPos();
- finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
+ finalPartitionOut = IntermediateEncryptedStream.wrapIfNecessary(job,
+ finalOut, false, finalOutputFile);
Writer writer =
new Writer(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
index 3667e3ca889..77171a8ce40 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -302,7 +303,7 @@ void init(Counters.Counter readsCounter) throws IOException {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
- in = CryptoUtils.wrapIfNecessary(conf, in);
+ in = IntermediateEncryptedStream.wrapIfNecessary(conf, in, file);
reader = new Reader(conf, in,
segmentLength - CryptoUtils.cryptoPadding(conf),
codec, readsCounter);
@@ -730,7 +731,8 @@ RawKeyValueIterator merge(Class keyClass, Class valueClass,
approxOutputSize, conf);
FSDataOutputStream out = fs.create(outputFile);
- out = CryptoUtils.wrapIfNecessary(conf, out);
+ out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
+ outputFile);
Writer writer = new Writer(conf, out, keyClass, valueClass,
codec, writesCounter, true);
writeFile(this, writer, reporter, conf);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java
new file mode 100644
index 00000000000..eb14a208c99
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+
+/**
+ * Used to wrap helpers while spilling intermediate files.
+ * Setting the {@link SpillCallBackInjector} helps in:
+ * 1- adding callbacks to capture the path of the spilled files.
+ * 2- Verifying the encryption when intermediate encryption is enabled.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class IntermediateEncryptedStream {
+
+ private static SpillCallBackInjector prevSpillCBInjector = null;
+
+ public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+ FSDataOutputStream out, Path outPath) throws IOException {
+ SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
+ return CryptoUtils.wrapIfNecessary(conf, out, true);
+ }
+
+ public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+ FSDataOutputStream out, boolean closeOutputStream,
+ Path outPath) throws IOException {
+ SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
+ return CryptoUtils.wrapIfNecessary(conf, out, closeOutputStream);
+ }
+
+ public static FSDataInputStream wrapIfNecessary(Configuration conf,
+ FSDataInputStream in, Path inputPath) throws IOException {
+ SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
+ return CryptoUtils.wrapIfNecessary(conf, in);
+ }
+
+ public static InputStream wrapIfNecessary(Configuration conf,
+ InputStream in, long length, Path inputPath) throws IOException {
+ SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
+ return CryptoUtils.wrapIfNecessary(conf, in, length);
+ }
+
+ public static void addSpillIndexFile(Path indexFilename, Configuration conf) {
+ SpillCallBackInjector.get().addSpillIndexFileCB(indexFilename, conf);
+ }
+
+ public static void validateSpillIndexFile(Path indexFilename,
+ Configuration conf) {
+ SpillCallBackInjector.get().validateSpillIndexFileCB(indexFilename, conf);
+ }
+
+ public static SpillCallBackInjector resetSpillCBInjector() {
+ return setSpillCBInjector(prevSpillCBInjector);
+ }
+
+ public synchronized static SpillCallBackInjector setSpillCBInjector(
+ SpillCallBackInjector spillInjector) {
+ prevSpillCBInjector =
+ SpillCallBackInjector.getAndSet(spillInjector);
+ return spillInjector;
+ }
+
+ private IntermediateEncryptedStream() {}
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java
new file mode 100644
index 00000000000..4677fe4be23
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Used for injecting callbacks while spilling files.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class SpillCallBackInjector {
+ private static SpillCallBackInjector instance = new SpillCallBackInjector();
+ public static SpillCallBackInjector get() {
+ return instance;
+ }
+ /**
+ * Sets the global SpillFilesCBInjector to the new value, returning the old
+ * value.
+ *
+ * @param spillInjector the new implementation for the spill injector.
+ * @return the previous implementation.
+ */
+ public static SpillCallBackInjector getAndSet(
+ SpillCallBackInjector spillInjector) {
+ SpillCallBackInjector prev = instance;
+ instance = spillInjector;
+ return prev;
+ }
+
+ public void writeSpillIndexFileCB(Path path) {
+ // do nothing
+ }
+
+ public void writeSpillFileCB(Path path, FSDataOutputStream out,
+ Configuration conf) {
+ // do nothing
+ }
+
+ public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
+ // do nothing
+ }
+
+ public String getSpilledFileReport() {
+ return null;
+ }
+
+ public void handleErrorInSpillFill(Path path, Exception e) {
+ // do nothing
+ }
+
+ public void corruptSpilledFile(Path fileName) throws IOException {
+ // do nothing
+ }
+
+ public void addSpillIndexFileCB(Path path, Configuration conf) {
+ // do nothing
+ }
+
+ public void validateSpillIndexFileCB(Path path, Configuration conf) {
+ // do nothing
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java
new file mode 100644
index 00000000000..68a6d7360da
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoStreamUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+
+/**
+ * An implementation class that keeps track of the spilled files.
+ */
+public class SpillCallBackPathsFinder extends SpillCallBackInjector {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SpillCallBackPathsFinder.class);
+ /**
+ * Encrypted spilled files.
+ */
+ private final ConcurrentHashMap> encryptedSpillFiles =
+ new ConcurrentHashMap<>();
+ /**
+ * Non-Encrypted spilled files.
+ */
+ private final ConcurrentHashMap> spillFiles =
+ new ConcurrentHashMap<>();
+ /**
+ * Invalid position access.
+ */
+ private final ConcurrentHashMap> invalidAccessMap =
+ new ConcurrentHashMap<>();
+ /**
+ * Index spill files.
+ */
+ private final Set indexSpillFiles =
+ Collections.newSetFromMap(new WeakHashMap());
+ /**
+ * Paths that were not found in the maps.
+ */
+ private final Set negativeCache =
+ Collections.newSetFromMap(new WeakHashMap());
+
+ protected ConcurrentHashMap> getFilesMap(
+ Configuration config) {
+ if (CryptoUtils.isEncryptedSpillEnabled(config)) {
+ return encryptedSpillFiles;
+ }
+ return spillFiles;
+ }
+
+ @Override
+ public void writeSpillFileCB(Path path, FSDataOutputStream out,
+ Configuration conf) {
+ long outPos = 0;
+ try {
+ outPos = out.getPos();
+ Set positions = getFilesMap(conf).get(path);
+ if (positions == null) {
+ Set newPositions =
+ Collections.newSetFromMap(new WeakHashMap());
+ positions = getFilesMap(conf).putIfAbsent(path, newPositions);
+ if (positions == null) {
+ positions = newPositions;
+ }
+ }
+ positions.add(outPos);
+ } catch (IOException e) {
+ LOG.debug("writeSpillFileCB.. exception getting position of the stream."
+ + " path:{}; pos:{}", path, outPos, e);
+ }
+ LOG.debug("writeSpillFileCB.. path:{}; pos:{}", path, outPos);
+ }
+
+ @Override
+ public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
+ if (path == null) {
+ return;
+ }
+ Set pathEntries = getFilesMap(conf).get(path);
+ if (pathEntries != null) {
+ try {
+ long isPos = CryptoStreamUtils.getInputStreamOffset(is);
+ if (pathEntries.contains(isPos)) {
+ LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos);
+ return;
+ }
+ Set positions = invalidAccessMap.get(path);
+ if (positions == null) {
+ Set newPositions =
+ Collections.newSetFromMap(new WeakHashMap());
+ positions = invalidAccessMap.putIfAbsent(path, newPositions);
+ if (positions == null) {
+ positions = newPositions;
+ }
+ }
+ positions.add(isPos);
+ LOG.debug("getSpillFileCB... access incorrect position.. "
+ + "Path {}; Pos: {}", path, isPos);
+ } catch (IOException e) {
+ LOG.error("Could not get inputStream position.. Path {}", path, e);
+ // do nothing
+ }
+ return;
+ }
+ negativeCache.add(path);
+ LOG.warn("getSpillFileCB.. Could not find spilled file .. Path: {}", path);
+ }
+
+ @Override
+ public String getSpilledFileReport() {
+ StringBuilder strBuilder =
+ new StringBuilder("\n++++++++ Spill Report ++++++++")
+ .append(dumpMapEntries("Encrypted Spilled Files",
+ encryptedSpillFiles))
+ .append(dumpMapEntries("Non-Encrypted Spilled Files",
+ spillFiles))
+ .append(dumpMapEntries("Invalid Spill Access",
+ invalidAccessMap))
+ .append("\n ----- Spilled Index Files ----- ")
+ .append(indexSpillFiles.size());
+ for (Path p : indexSpillFiles) {
+ strBuilder.append("\n\t index-path: ").append(p.toString());
+ }
+ strBuilder.append("\n ----- Negative Cache files ----- ")
+ .append(negativeCache.size());
+ for (Path p : negativeCache) {
+ strBuilder.append("\n\t path: ").append(p.toString());
+ }
+ return strBuilder.toString();
+ }
+
+ @Override
+ public void addSpillIndexFileCB(Path path, Configuration conf) {
+ if (path == null) {
+ return;
+ }
+ indexSpillFiles.add(path);
+ LOG.debug("addSpillIndexFileCB... Path: {}", path);
+ }
+
+ @Override
+ public void validateSpillIndexFileCB(Path path, Configuration conf) {
+ if (path == null) {
+ return;
+ }
+ if (indexSpillFiles.contains(path)) {
+ LOG.debug("validateSpillIndexFileCB.. Path: {}", path);
+ return;
+ }
+ LOG.warn("validateSpillIndexFileCB.. could not retrieve indexFile.. "
+ + "Path: {}", path);
+ negativeCache.add(path);
+ }
+
+ public Set getEncryptedSpilledFiles() {
+ return Collections.unmodifiableSet(encryptedSpillFiles.keySet());
+ }
+
+ /**
+ * Gets the set of path:pos of the entries that were accessed incorrectly.
+ * @return a set of string in the format of {@literal Path[Pos]}
+ */
+ public Set getInvalidSpillEntries() {
+ Set result = new LinkedHashSet<>();
+ for (Entry> spillMapEntry: invalidAccessMap.entrySet()) {
+ for (Long singleEntry : spillMapEntry.getValue()) {
+ result.add(String.format("%s[%d]",
+ spillMapEntry.getKey(), singleEntry));
+ }
+ }
+ return result;
+ }
+
+ private String dumpMapEntries(String label,
+ Map> entriesMap) {
+ StringBuilder strBuilder =
+ new StringBuilder(String.format("%n ----- %s ----- %d", label,
+ entriesMap.size()));
+ for (Entry> encryptedSpillEntry
+ : entriesMap.entrySet()) {
+ strBuilder.append(String.format("%n\t\tpath: %s",
+ encryptedSpillEntry.getKey()));
+ for (Long singlePos : encryptedSpillEntry.getValue()) {
+ strBuilder.append(String.format("%n\t\t\tentry: %d", singlePos));
+ }
+ }
+ return strBuilder.toString();
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java
new file mode 100644
index 00000000000..451e6f65503
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+/**
+ * Helper classes for the shuffle/spill encryptions.
+ */
+package org.apache.hadoop.mapreduce.security;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 96ae445ba9f..5741ec771cd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
@@ -512,7 +513,9 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
}
InputStream is = input;
- is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
+ is =
+ IntermediateEncryptedStream.wrapIfNecessary(jobConf, is,
+ compressedLength, null);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index f45742fe5dd..49d10d58be0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
/**
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
@@ -151,7 +152,9 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
try {
- inStream = CryptoUtils.wrapIfNecessary(job, inStream);
+ inStream =
+ IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
+ mapOutputFileName);
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
decompressedLength, metrics, reporter);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index f26c10ac592..0c82a6db3ee 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -55,7 +55,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -467,7 +467,9 @@ public void merge(List> inputs) throws IOException {
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
- FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+ FSDataOutputStream out =
+ IntermediateEncryptedStream.wrapIfNecessary(jobConf,
+ rfs.create(outputPath), outputPath);
Writer writer = new Writer(jobConf, out,
(Class) jobConf.getMapOutputKeyClass(),
(Class) jobConf.getMapOutputValueClass(), codec, null, true);
@@ -551,7 +553,9 @@ public void merge(List inputs) throws IOException {
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
- FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+ FSDataOutputStream out =
+ IntermediateEncryptedStream.wrapIfNecessary(jobConf,
+ rfs.create(outputPath), outputPath);
Writer writer = new Writer(jobConf, out,
(Class) jobConf.getMapOutputKeyClass(),
(Class) jobConf.getMapOutputValueClass(), codec, null, true);
@@ -734,7 +738,9 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
- FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
+ FSDataOutputStream out =
+ IntermediateEncryptedStream.wrapIfNecessary(job,
+ fs.create(outputPath), outputPath);
Writer writer = new Writer(job, out, keyClass, valueClass,
codec, null, true);
try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index f22169d282a..1bd1916ce99 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import com.google.common.annotations.VisibleForTesting;
@@ -83,7 +83,8 @@ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
this.fs = fs;
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
- disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
+ disk = IntermediateEncryptedStream.wrapIfNecessary(conf,
+ fs.create(tmpOutputPath), tmpOutputPath);
}
@VisibleForTesting
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
index 4e4e78e1e3c..4319e174168 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
@@ -17,14 +17,22 @@
*/
package org.apache.hadoop.mapreduce.util;
+import java.io.File;
import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* A class that contains utility methods for MR Job configuration.
*/
public final class MRJobConfUtil {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MRJobConfUtil.class);
public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
/**
@@ -130,4 +138,54 @@ public static long getTaskProgressWaitDeltaTimeThreshold() {
public static double convertTaskProgressToFactor(final float progress) {
return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
}
+
+ /**
+ * For unit tests, use urandom to avoid the YarnChild process from hanging
+ * on low entropy systems.
+ */
+ private static final String TEST_JVM_SECURITY_EGD_OPT =
+ "-Djava.security.egd=file:/dev/./urandom";
+
+ public static Configuration initEncryptedIntermediateConfigsForTesting(
+ Configuration conf) {
+ Configuration config =
+ (conf == null) ? new Configuration(): conf;
+ final String childJVMOpts =
+ TEST_JVM_SECURITY_EGD_OPT.concat(" ")
+ .concat(config.get("mapred.child.java.opts", " "));
+ // Set the jvm arguments.
+ config.set("yarn.app.mapreduce.am.admin-command-opts",
+ TEST_JVM_SECURITY_EGD_OPT);
+ config.set("mapred.child.java.opts", childJVMOpts);
+ config.setBoolean("mapreduce.job.encrypted-intermediate-data", true);
+ return config;
+ }
+
+ /**
+ * Set local directories so that the generated folders is subdirectory of the
+ * test directories.
+ * @param conf
+ * @param testRootDir
+ * @return
+ */
+ public static Configuration setLocalDirectoriesConfigForTesting(
+ Configuration conf, File testRootDir) {
+ Configuration config =
+ (conf == null) ? new Configuration(): conf;
+ final File hadoopLocalDir = new File(testRootDir, "hadoop-dir");
+ // create the directory
+ if (!hadoopLocalDir.getAbsoluteFile().mkdirs()) {
+ LOG.info("{} directory already exists", hadoopLocalDir.getPath());
+ }
+ Path mapredHadoopTempDir = new Path(hadoopLocalDir.getPath());
+ Path mapredSystemDir = new Path(mapredHadoopTempDir, "system");
+ Path stagingDir = new Path(mapredHadoopTempDir, "tmp/staging");
+ // Set the temp directories a subdir of the test directory.
+ config.set("mapreduce.jobtracker.staging.root.dir", stagingDir.toString());
+ config.set("mapreduce.jobtracker.system.dir", mapredSystemDir.toString());
+ config.set("mapreduce.cluster.temp.dir", mapredHadoopTempDir.toString());
+ config.set("mapreduce.cluster.local.dir",
+ new Path(mapredHadoopTempDir, "local").toString());
+ return config;
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
index a6b19646cf1..732e4783d9c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
@@ -23,6 +23,7 @@
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,43 +50,62 @@
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestMerger {
- private Configuration conf;
+ private static File testRootDir;
+ @Rule
+ public TestName unitTestName = new TestName();
+ private File unitTestDir;
private JobConf jobConf;
private FileSystem fs;
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ // setup the test root directory
+ testRootDir =
+ GenericTestUtils.setupTestRootDir(
+ TestMerger.class);
+ }
+
@Before
public void setup() throws IOException {
- conf = new Configuration();
+ unitTestDir = new File(testRootDir, unitTestName.getMethodName());
+ unitTestDir.mkdirs();
jobConf = new JobConf();
- fs = FileSystem.getLocal(conf);
+ // Set the temp directories a subdir of the test directory.
+ MRJobConfUtil.setLocalDirectoriesConfigForTesting(jobConf, unitTestDir);
+ jobConf.set(MRConfig.FRAMEWORK_NAME, "local");
+ fs = FileSystem.getLocal(jobConf);
}
@Test
public void testEncryptedMerger() throws Throwable {
- jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
- conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ // Enable intermediate encryption.
+ MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(jobConf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(new byte[16], credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
@@ -105,8 +125,8 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
MergeManagerImpl mergeManager = new MergeManagerImpl(
- reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
- null, null, new Progress(), new MROutputFiles());
+ reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
+ null, null, null, new Progress(), new MROutputFiles());
// write map outputs
Map map1 = new TreeMap();
@@ -114,12 +134,12 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
map1.put("carrot", "delicious");
Map map2 = new TreeMap();
map1.put("banana", "pretty good");
- byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
- byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
+ byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+ byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
InMemoryMapOutput mapOutput1 = new InMemoryMapOutput(
- conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
+ jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
InMemoryMapOutput mapOutput2 = new InMemoryMapOutput(
- conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
+ jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
@@ -149,12 +169,12 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
map3.put("carrot", "amazing");
Map map4 = new TreeMap();
map4.put("banana", "bla");
- byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
- byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
+ byte[] mapOutputBytes3 = writeMapOutput(jobConf, map3);
+ byte[] mapOutputBytes4 = writeMapOutput(jobConf, map4);
InMemoryMapOutput mapOutput3 = new InMemoryMapOutput(
- conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
+ jobConf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
InMemoryMapOutput mapOutput4 = new InMemoryMapOutput(
- conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
+ jobConf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
mapOutputBytes3.length);
System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
@@ -173,12 +193,13 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
List paths = new ArrayList();
- Iterator iterator = mergeManager.onDiskMapOutputs.iterator();
+ Iterator iterator =
+ mergeManager.onDiskMapOutputs.iterator();
List keys = new ArrayList();
List values = new ArrayList();
while (iterator.hasNext()) {
CompressAwarePath next = iterator.next();
- readOnDiskMapOutput(conf, fs, next, keys, values);
+ readOnDiskMapOutput(jobConf, fs, next, keys, values);
paths.add(next);
}
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
@@ -186,8 +207,8 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
mergeManager.close();
mergeManager = new MergeManagerImpl(
- reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
- null, null, new Progress(), new MROutputFiles());
+ reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
+ null, null, null, new Progress(), new MROutputFiles());
MergeThread onDiskMerger = mergeManager.createOnDiskMerger();
onDiskMerger.merge(paths);
@@ -196,7 +217,8 @@ public void testInMemoryAndOnDiskMerger() throws Throwable {
keys = new ArrayList();
values = new ArrayList();
- readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
+ readOnDiskMapOutput(jobConf, fs,
+ mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
@@ -222,7 +244,8 @@ private byte[] writeMapOutput(Configuration conf, Map keysToValu
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List keys, List values) throws IOException {
- FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+ FSDataInputStream in =
+ IntermediateEncryptedStream.wrapIfNecessary(conf, fs.open(path), path);
IFile.Reader reader = new IFile.Reader(conf, in,
fs.getFileStatus(path).getLen(), null, null);
@@ -252,14 +275,15 @@ public void testUncompressed() throws IOException {
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List> segments) throws IOException {
- Path tmpDir = new Path("localpath");
+ Path tmpDir = new Path(jobConf.get("mapreduce.cluster.temp.dir"),
+ "localpath");
Class keyClass = (Class) jobConf.getMapOutputKeyClass();
Class valueClass = (Class) jobConf.getMapOutputValueClass();
RawComparator comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
- RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
+ RawKeyValueIterator mergeQueue = Merger.merge(jobConf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
index a3ea26e81f0..c8b6c894d0c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
@@ -31,8 +31,20 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
+import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
@@ -41,8 +53,39 @@
* -jt local -libjars
*/
public class TestLocalJobSubmission {
- private static Path TEST_ROOT_DIR =
- new Path(System.getProperty("test.build.data","/tmp"));
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestLocalJobSubmission.class);
+
+ private static File testRootDir;
+
+ @Rule
+ public TestName unitTestName = new TestName();
+ private File unitTestDir;
+ private Path jarPath;
+ private Configuration config;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ // setup the test root directory
+ testRootDir =
+ GenericTestUtils.setupTestRootDir(TestLocalJobSubmission.class);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ unitTestDir = new File(testRootDir, unitTestName.getMethodName());
+ unitTestDir.mkdirs();
+ config = createConfig();
+ jarPath = makeJar(new Path(unitTestDir.getAbsolutePath(), "test.jar"));
+ }
+
+ private Configuration createConfig() {
+ // Set the temp directories a subdir of the test directory.
+ Configuration conf =
+ MRJobConfUtil.setLocalDirectoriesConfigForTesting(null, unitTestDir);
+ conf.set(MRConfig.FRAMEWORK_NAME, "local");
+ return conf;
+ }
/**
* Test the local job submission options of -jt local -libjars.
@@ -51,12 +94,9 @@ public class TestLocalJobSubmission {
*/
@Test
public void testLocalJobLibjarsOption() throws IOException {
- Configuration conf = new Configuration();
-
- testLocalJobLibjarsOption(conf);
-
- conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
- testLocalJobLibjarsOption(conf);
+ testLocalJobLibjarsOption(config);
+ config.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
+ testLocalJobLibjarsOption(config);
}
/**
@@ -67,8 +107,6 @@ public void testLocalJobLibjarsOption() throws IOException {
*/
private void testLocalJobLibjarsOption(Configuration conf)
throws IOException {
- Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
conf.set(MRConfig.FRAMEWORK_NAME, "local");
final String[] args = {
@@ -79,8 +117,7 @@ private void testLocalJobLibjarsOption(Configuration conf)
try {
res = ToolRunner.run(conf, new SleepJob(), args);
} catch (Exception e) {
- System.out.println("Job failed with " + e.getLocalizedMessage());
- e.printStackTrace(System.out);
+ LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
@@ -93,18 +130,20 @@ private void testLocalJobLibjarsOption(Configuration conf)
*/
@Test
public void testLocalJobEncryptedIntermediateData() throws IOException {
- Configuration conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, "local");
- conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ config = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(config);
final String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
- res = ToolRunner.run(conf, new SleepJob(), args);
+ SpillCallBackPathsFinder spillInjector =
+ (SpillCallBackPathsFinder) IntermediateEncryptedStream
+ .setSpillCBInjector(new SpillCallBackPathsFinder());
+ res = ToolRunner.run(config, new SleepJob(), args);
+ Assert.assertTrue("No spill occurred",
+ spillInjector.getEncryptedSpilledFiles().size() > 0);
} catch (Exception e) {
- System.out.println("Job failed with " + e.getLocalizedMessage());
- e.printStackTrace(System.out);
+ LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
@@ -116,15 +155,13 @@ public void testLocalJobEncryptedIntermediateData() throws IOException {
*/
@Test
public void testJobMaxMapConfig() throws Exception {
- Configuration conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, "local");
- conf.setInt(MRJobConfig.JOB_MAX_MAP, 0);
+ config.setInt(MRJobConfig.JOB_MAX_MAP, 0);
final String[] args = {
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
};
int res = -1;
try {
- res = ToolRunner.run(conf, new SleepJob(), args);
+ res = ToolRunner.run(config, new SleepJob(), args);
fail("Job should fail");
} catch (IllegalArgumentException e) {
assertTrue(e.getLocalizedMessage().contains(
@@ -139,20 +176,16 @@ public void testJobMaxMapConfig() throws Exception {
*/
@Test
public void testLocalJobFilesOption() throws IOException {
- Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
- conf.set(MRConfig.FRAMEWORK_NAME, "local");
- final String[] args =
- {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
- "-mt", "1", "-rt", "1"};
+ config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+ final String[] args = {
+ "-jt", "local", "-files", jarPath.toString(),
+ "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+ };
int res = -1;
try {
- res = ToolRunner.run(conf, new SleepJob(), args);
+ res = ToolRunner.run(config, new SleepJob(), args);
} catch (Exception e) {
- System.out.println("Job failed with " + e.getLocalizedMessage());
- e.printStackTrace(System.out);
+ LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
@@ -165,27 +198,22 @@ public void testLocalJobFilesOption() throws IOException {
*/
@Test
public void testLocalJobArchivesOption() throws IOException {
- Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
- conf.set(MRConfig.FRAMEWORK_NAME, "local");
+ config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
final String[] args =
{"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
"1", "-mt", "1", "-rt", "1"};
int res = -1;
try {
- res = ToolRunner.run(conf, new SleepJob(), args);
+ res = ToolRunner.run(config, new SleepJob(), args);
} catch (Exception e) {
- System.out.println("Job failed with " + e.getLocalizedMessage());
- e.printStackTrace(System.out);
+ LOG.error("Job failed with {}" + e.getLocalizedMessage(), e);
fail("Job failed");
}
assertEquals("dist job res is not 0:", 0, res);
}
private Path makeJar(Path p) throws IOException {
- FileOutputStream fos = new FileOutputStream(new File(p.toString()));
+ FileOutputStream fos = new FileOutputStream(p.toString());
JarOutputStream jos = new JarOutputStream(fos);
ZipEntry ze = new ZipEntry("test.jar.inside");
jos.putNextEntry(ze);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
deleted file mode 100644
index fa8dacf6dd5..00000000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings(value={"unchecked", "deprecation"})
-/**
- * This test tests the support for a merge operation in Hadoop. The input files
- * are already sorted on the key. This test implements an external
- * MapOutputCollector implementation that just copies the records to different
- * partitions while maintaining the sort order in each partition. The Hadoop
- * framework's merge on the reduce side will merge the partitions created to
- * generate the final output which is sorted on the key.
- */
-@RunWith(Parameterized.class)
-public class TestMRIntermediateDataEncryption {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
- /**
- * Use urandom to avoid the YarnChild process from hanging on low entropy
- * systems.
- */
- private static final String JVM_SECURITY_EGD_OPT =
- "-Djava.security.egd=file:/dev/./urandom";
- // Where MR job's input will reside.
- private static final Path INPUT_DIR = new Path("/test/input");
- // Where output goes.
- private static final Path OUTPUT = new Path("/test/output");
- private static final int NUM_LINES = 1000;
- private static MiniMRClientCluster mrCluster = null;
- private static MiniDFSCluster dfsCluster = null;
- private static FileSystem fs = null;
- private static final int NUM_NODES = 2;
-
- private final String testTitle;
- private final int numMappers;
- private final int numReducers;
- private final boolean isUber;
-
- /**
- * List of arguments to run the JunitTest.
- * @return
- */
- @Parameterized.Parameters(
- name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
- + "mappers:{1}, reducers:{2}, isUber:{3})")
- public static Collection